blob: 9ca0fabf1360d13999a5d8154c0d4dc733333480 [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
############################################################################
#
# @file test_sys_verify.py
# @date 2015/02/04 15:26:21
# @brief This file is a test file for palo data loading and verifying.
#
#############################################################################
"""
测试各种数据类型和存储方式的数据正确性
"""
import pytest
from data import verify as VERIFY_DATA
from lib import palo_config
from lib import palo_client
from lib import util
from lib import common
config = palo_config.config
LOG = palo_client.LOG
L = palo_client.L
broker_name = config.broker_name
broker_info = palo_config.broker_info
def setup_module():
"""
setUp
"""
global client
client = palo_client.PaloClient(config.fe_host, config.fe_query_port, user=config.fe_user, password=config.fe_password)
client.init()
try:
is_exist = False
brokers = client.get_broker_list()
for br in brokers:
if broker_name == br[0]:
is_exist = True
break
if not is_exist:
raise pytest.skip('no broker')
except:
pass
def check_table_load_and_verify(table_name):
"""
验证表是否创建成功,导入数据,校验
"""
assert client.show_tables(table_name)
assert client.get_index(table_name)
data_desc_list = palo_client.LoadDataInfo(VERIFY_DATA.file_path_1, table_name)
label_1 = util.get_label()
client.batch_load(label_1, data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(VERIFY_DATA.expected_data_file_list_1, table_name)
data_desc_list = palo_client.LoadDataInfo(VERIFY_DATA.file_path_2, table_name)
label_2 = util.get_label()
client.batch_load(label_2, data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(VERIFY_DATA.expected_data_file_list_2, table_name)
def test_column_sum():
"""
{
"title": "test_sys_verify.test_column_sum",
"describe": "测试列存储sum聚合方式",
"tag": "system,p0"
}
"""
"""
测试列存储sum聚合方式
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, \
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.create_table(table_name, \
VERIFY_DATA.schema_1, storage_type='column', keys_desc='AGGREGATE KEY (K1)')
check_table_load_and_verify(table_name)
client.clean(database_name)
def test_column_max():
"""
{
"title": "test_sys_verify.test_column_max",
"describe": "测试列存储max聚合方式",
"tag": "system,p0"
}
"""
"""
测试列存储max聚合方式
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, \
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.create_table(table_name, \
VERIFY_DATA.schema_2, storage_type='column', keys_desc='AGGREGATE KEY (K1)')
check_table_load_and_verify(table_name)
client.clean(database_name)
def test_column_min():
"""
{
"title": "test_sys_verify.test_column_min",
"describe": "测试列存储min聚合方式",
"tag": "system,p0"
}
"""
"""
测试列存储min聚合方式
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, \
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.create_table(table_name, \
VERIFY_DATA.schema_3, storage_type='column', keys_desc='AGGREGATE KEY (K1)')
check_table_load_and_verify(table_name)
client.clean(database_name)
def test_column_replace():
"""
{
"title": "test_sys_verify.test_column_replace",
"describe": "测试列存储replace聚合方式",
"tag": "system,p0"
}
"""
"""
测试列存储replace聚合方式
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, \
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.create_table(table_name, \
VERIFY_DATA.schema_4, storage_type='column', keys_desc='AGGREGATE KEY (k1)')
check_table_load_and_verify(table_name)
client.clean(database_name)
def test_same_name_diff_len():
"""
{
"title": "test_sys_verify.test_same_name_diff_len",
"describe": "查询的数据越界,tinyint的大小为[-128, 127]你们可以查小于这个最小值,以及大于这个最大值的查询",
"tag": "system,p0"
}
"""
"""
1.
针对上次的nmga的core你们可以加个case,就是查询的数据越界
就像tinyint的大小为[-128, 127]你们可以查小于这个最小值,以及大于这个最大值的查询
都是字段f,一个是int,一个是tinyint;或者一个是varchar(10),一个是varchar(20)
2.
char或varchar长度溢出时需要进行处理
改写逻辑如下: 对于varchar(5)
k1 < 'aaaaab' --> k1 <= 'aaaaa' 截断、同时改写
k1 <= 'aaaaab' --> k1 <= 'aaaaa' 只截断,不改写
k1 = 'aaaaab' --> 不下推
k1 > 'aaaaab' --> k1 > 'aaaaa' 只截断,不改写
k1 >= 'aaaaab' --> k1 > 'aaaaa' 截断、同时改写
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, \
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
table_name_a = 'table_a'
table_name_b = 'table_b'
client.create_table(table_name_a, VERIFY_DATA.schema_5, storage_type='column', \
keys_desc='AGGREGATE KEY (K1, k2, k3, k4, k5, k6, k7, k8, k9, k10)')
client.create_table(table_name_b, VERIFY_DATA.schema_6, storage_type='column', \
keys_desc='AGGREGATE KEY (K1, k2, k3, k4, k5, k6, k7, k8, k9, k10)')
assert client.show_tables(table_name_a)
assert client.show_tables(table_name_b)
data_desc_list = palo_client.LoadDataInfo(VERIFY_DATA.file_path_3, table_name_a)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
data_desc_list = palo_client.LoadDataInfo(VERIFY_DATA.file_path_3, table_name_b)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
sql = 'select * from table_a where k1 > 130'
assert () == client.execute(sql)
sql = 'select * from table_a where k1 < -130'
assert () == client.execute(sql)
sql = 'select a.k1 as a_k1, b.k1 as b_k1, a.k9 as a_k9, b.k9 as b_k9 from table_a as a, ' \
'table_b as b where a.k9 = b.k9 order by a_k1, b_k1, a_k9, b_k9'
assert common.check_by_file(VERIFY_DATA.expected_file_1, sql=sql, client=client)
sql = "select a.k1 as a_k1, b.k1 as b_k1, a.k9 as a_k9, b.k9 as b_k9 " \
"from table_a as a, table_b as b " \
"where a.k9 > b.k9 and a.k9 = 'vzb' and b.k9 = 'ddsc' order by a_k1, b_k1, a_k9, b_k9"
assert common.check_by_file(VERIFY_DATA.expected_file_2, sql=sql, client=client)
sql = "select a.k1 as a_k1, b.k1 as b_k1, a.k9 as a_k9, b.k9 as b_k9 " \
"from table_a as a, table_b as b " \
"where a.k9 < b.k9 and a.k9 = 'ddsc' and b.k9 = 'vzb' order by a_k1, b_k1, a_k9, b_k9"
assert common.check_by_file(VERIFY_DATA.expected_file_3, sql=sql, client=client)
#NOTICE K9 varchar(10)
#TODO
sql = "select k9 from table_a where k9 = 'aaaaaaaaaa'"
assert () == client.execute(sql)
sql = "select k9 from table_a where k9 = 'aaaaaaaaaaa'"
print(client.execute(sql))
sql = "select k9 from table_a where k9 > 'aaaaaaaaaaa'"
print(client.execute(sql))
sql = "select k9 from table_a where k9 < 'aaaaaaaaaaa'"
print(client.execute(sql))
client.clean(database_name)
def test_not_support_row():
"""
{
"title": "test_sys_verify.test_not_support_row",
"describe": "不支持row存储,目前client中create_table,统一建column,不支持指定storage_type",
"tag": "system,p0,fuzz"
}
"""
"""
不支持row存储
目前client中create_table,统一建column,不支持指定storage_type
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, \
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
try:
ret = client.create_table(table_name,
VERIFY_DATA.schema_4, storage_type='row',
keys_desc='AGGREGATE KEY (K1)')
assert not ret
except Exception as e:
pass
client.clean(database_name)
def teardown_module():
"""
tearDown
"""
pass
if __name__ == '__main__':
import pdb
pdb.set_trace()
setup_module()