blob: c5502790ac174832e28b920860b86fb83e6edc8e [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
############################################################################
#
# @file test_sys_load_parse_from_path.py
# @date 2019-09-10 16:02:05
# @brief This file is a test file for broker load parse columns from file path
#
#############################################################################
"""
测试broker load,从文件路径中解析数据
"""
import os
import sys
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(file_dir)
file_dir = os.path.abspath(os.path.dirname(__file__))
from data import schema as DATA
from data import load_file as FILE
from lib import palo_config
from lib import palo_client
from lib import util
client = None
LOG = palo_client.LOG
L = palo_client.L
config = palo_config.config
broker_info = palo_config.broker_info
def setup_module():
"""
setUp
"""
global client
client = palo_client.PaloClient(config.fe_host, config.fe_query_port, user=config.fe_user,
password=config.fe_password)
client.init()
def init_check_table(db_name, check_table='check_tb'):
"""init check table for data verify"""
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['5', '20', '30', 'MAXVALUE']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(check_table, DATA.partition_column_list, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
database_name=db_name, set_null=True)
assert ret
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_normal, check_table,
column_name_list=column_name_list)
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info,
database_name=db_name)
assert ret
def check2(sql1, sql2):
"""check select result"""
ret1 = client.execute(sql1)
ret2 = client.execute(sql2)
LOG.info(L('sql', sql=sql1))
LOG.info(L('check sql', check_sql=sql2))
util.check(ret1, ret2)
def test_parse_key_from_path():
"""
{
"title": "test_sys_load_parse_from_path.test_parse_key_from_path",
"describe": "parse partition key from file path, 从路径中解析key列,分区列",
"tag": "system,p1"
}
"""
"""
parse partition key from file path
从路径中解析key列,分区列
"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['5', '20', '30', 'MAXVALUE']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)')
assert ret
column_name_list = ['tmp_k1', 'tmp_k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
column_from_path = ['k1', 'k2', 'city']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_normal, table_name,
column_name_list=column_name_list, columns_from_path=column_from_path)
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
assert ret
check_table = 'check_tb'
init_check_table(database_name)
sql1 = 'select * from %s order by 1, 2, 3' % table_name
sql2 = 'select -1, 0, k3, k4, k5, v1, v2, v3, v4, v5, v6 from %s order by k3' % check_table
check2(sql1, sql2)
client.clean(database_name)
def test_parse_from_path_set():
"""
{
"title": "test_sys_load_parse_from_path.test_parse_from_path_set",
"describe": "parse value column from file path and set, 从路径中解析value列, 使用路径中解析的列进行set函数",
"tag": "system,p1"
}
"""
"""
parse value column from file path and set
从路径中解析value列
使用路径中解析的列进行set函数
"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['5', '20', '30', 'MAXVALUE']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)')
assert ret
column_name_list = ['tmp_k1', 'tmp_k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'tmp_v3', 'v4', 'v5', 'v6']
column_from_path = ['k1', 'k2', 'city']
set_list = ['v3=city']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_normal, table_name, set_list=set_list,
column_name_list=column_name_list, columns_from_path=column_from_path)
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
assert ret
check_table = 'check_tb'
init_check_table(database_name)
sql1 = 'select * from %s order by 1, 2, 3' % table_name
sql2 = 'select -1, 0, k3, k4, k5, v1, v2, "bj", v4, v5, v6 from %s order by k3' % check_table
check2(sql1, sql2)
client.clean(database_name)
def test_parse_special_from_path():
"""
{
"title": "test_sys_load_parse_from_path.test_parse_special_from_path",
"describe": "parse special char empty from path to char column, 从路径中解析空",
"tag": "system,p1,fuzz"
}
"""
"""
parse special char empty from path to char column
从路径中解析空
"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['5', '20', '30', 'MAXVALUE']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)')
assert ret
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'tmp_v3', 'v4', 'v5', 'v6']
column_from_path = ['city']
set_list = ['v3=city']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_empty, table_name, set_list=set_list,
column_name_list=column_name_list, columns_from_path=column_from_path)
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
assert ret
check_table = 'check_tb'
init_check_table(database_name)
sql1 = 'select * from %s order by 1, 2, 3' % table_name
sql2 = 'select k1, k2, k3, k4, k5, v1, v2, "", v4, v5, v6 from %s order by k3' % check_table
check2(sql1, sql2)
client.clean(database_name)
def test_parse_special_from_path_1():
"""
{
"title": "test_sys_load_parse_from_path.test_parse_special_from_path_1",
"describe": "parse special char empty from path to int column, 从路径中解析空到数字列,报错",
"tag": "system,p1,fuzz"
}
"""
"""
parse special char empty from path to int column
从路径中解析空到数字列,报错
"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['5', '20', '30', 'MAXVALUE']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
set_null=True)
assert ret
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
column_from_path = ['city']
set_list = ['k1=city']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_empty, table_name, set_list=set_list,
column_name_list=column_name_list, columns_from_path=column_from_path)
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
assert ret
check_table = 'check_tb'
init_check_table(database_name)
sql1 = 'select * from %s order by 1, 2, 3' % table_name
sql2 = 'select NULL, k2, k3, k4, k5, v1, v2, v3, v4, v5, v6 from %s order by k3' % check_table
check2(sql1, sql2)
client.clean(database_name)
def test_parse_column_wrong():
"""
{
"title": "test_sys_load_parse_from_path.test_parse_column_wrong",
"describe": "parse duplicate column from load file, 重复列报错",
"tag": "system,p1,fuzz"
}
"""
"""
parse duplicate column from load file
重复列报错
"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['5', '20', '30', 'MAXVALUE']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)')
assert ret
column_name_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
column_from_path = ['k1', 'k2', 'city']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_normal, table_name,
column_name_list=column_name_list, columns_from_path=column_from_path)
try:
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
assert 0 == 1
except Exception as e:
print(str(e))
assert 'Duplicate column' in str(e)
client.clean(database_name)
def test_parse_column_wrong_1():
"""
{
"title": "test_sys_load_parse_from_path.test_parse_column_wrong_1",
"describe": "parse a not exist column from file path, 从路径中解析个不存在的列,失败",
"tag": "system,p1,fuzz"
}
"""
"""
parse a not exist column from file path
从路径中解析个不存在的列,失败
"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['5', '20', '30', 'MAXVALUE']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
set_null=True)
assert ret
column_name_list = ['k1', 'k2', 'k3', 'k4', 'tmp_k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
column_from_path = ['k5']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_empty, table_name,
column_name_list=column_name_list,
columns_from_path=column_from_path)
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
assert not ret
client.clean(database_name)
def test_parse_column_filtered():
"""
{
"title": "test_sys_load_parse_from_path.test_parse_column_filtered",
"describe": "parse a column to a not exist partition, 解析的列不存在相应的分区,被过滤",
"tag": "system,p1,fuzz"
}
"""
"""
parse a column to a not exist partition
解析的列不存在相应的分区,被过滤
"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['-50', '-10', '-5', '-2']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
set_null=True)
assert ret
column_name_list = ['tmp_k1', 'k2', 'k3', 'k4', 'k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
column_from_path = ['k1', 'city']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_empty, table_name,
column_name_list=column_name_list, columns_from_path=column_from_path)
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
assert not ret
client.clean(database_name)
def test_parse_wrong_type():
"""
{
"title": "test_sys_load_parse_from_path.test_parse_wrong_type",
"describe": " test parse wrong type from file path, 解析出来的数据类型不符合,由strict mode参数控制是否导入成功,默认strict mode为FALSE",
"tag": "system,p1,fuzz"
}
"""
"""
test parse wrong type from file path
解析出来的数据类型不符合,由strict mode参数控制是否导入成功,默认strict mode为FALSE
"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['5', '20', '30', 'MAXVALUE']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(table_name, DATA.partition_column_list, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5)',
set_null=True)
assert ret
column_name_list = ['k1', 'k2', 'k3', 'k4', 'tmp_k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'v6']
column_from_path = ['k5']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_float, table_name,
column_name_list=column_name_list,
columns_from_path=column_from_path)
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info, strict_mode=True)
assert not ret
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
assert ret
client.clean(database_name)
def test_parse_float():
"""
{
"title": "test_sys_load_parse_from_path.test_parse_float",
"describe": "parse a float from file path, 解析小数类型",
"tag": "system,p1"
}
"""
"""
parse a float from file path
解析小数类型
"""
database_name, table_name, rollup_table_name = util.gen_name_list()
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_name_list = ['partition_a', 'partition_b', 'partition_c', 'partition_d']
partition_value_list = ['5', '20', '30', 'MAXVALUE']
partition_info = palo_client.PartitionInfo('k1', partition_name_list,
partition_value_list)
distribution_info = palo_client.DistributionInfo('HASH(k2)', 5)
ret = client.create_table(table_name, DATA.partition_column_list_parse, partition_info,
distribution_info, keys_desc='AGGREGATE KEY (k1, k2, k3, k4, k5_t)',
set_null=True)
assert ret
column_name_list = ['k1', 'k2', 'k3', 'k4', 'tmp_k5', 'v1', 'v2', 'v3', 'v4', 'v5', 'tmp_v6']
column_from_path = ['k5']
set_list = ['k5_t=tmp_k5, v6=k5']
data_desc = palo_client.LoadDataInfo(FILE.parse_hdfs_file_path_float, table_name,
column_name_list=column_name_list, set_list=set_list,
columns_from_path=column_from_path)
ret = client.batch_load(util.get_label(), data_desc, is_wait=True, broker=broker_info)
assert ret
check_table = 'check_tb'
init_check_table(database_name)
sql1 = 'select * from %s order by 1, 2, 3' % table_name
sql2 = 'select k1, k2, k3, k4, k5, v1, v2, v3, v4, v5, 100.12345 from %s order by k3' % check_table
check2(sql1, sql2)
client.clean(database_name)
def teardown_module():
"""teardown"""
pass
if __name__ == '__main__':
setup_module()
test_parse_float()