blob: 4d32125cb75d0c285cb4c3fef2158b572375cf85 [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
file: stream_test_partition.py
测试 stream load partition
"""
import os
import sys
import time
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.append(file_dir)
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(file_dir)
import pymysql
from data import small_load_complex as DATA
from lib import palo_config
from lib import palo_client
from lib import util
client = None
config = palo_config.config
storage_type = DATA.storage_type
bucket_num = 5
distribution_type = 'hash(k1)'
duplicate_key = 'DUPLICATE KEY(k1, k2, k3, k4, k5, k6, k10, k11, k7)'
unique_key = 'UNIQUE KEY(k1, k2, k3, k4, k5, k6, k10, k11, k7)'
column_list = [("k1", "tinyint"), \
("k2", "smallint"), \
("k3", "int"),\
("k4", "bigint"), \
("k5", "decimal(9, 3)"),\
("k6", "char(5)"), \
("k10", "date"), \
("k11", "datetime"),\
("k7", "varchar(20)"),\
("k8", "double", "max"),\
("k9", "float", "sum")
]
column_no_agg_list = [("k1", "tinyint"), \
("k2", "smallint"), \
("k3", "int"),\
("k4", "bigint"), \
("k5", "decimal(9, 3)"),\
("k6", "char(5)"), \
("k10", "date"), \
("k11", "datetime"),\
("k7", "varchar(20)"),\
("k8", "double"),\
("k9", "float")
]
def setup_module():
"""
setUp
"""
global client, mysql_cursor
mysql_con=pymysql.connect(host=config.mysql_host, user=config.mysql_user,
passwd=config.mysql_password, port=config.mysql_port,
db=config.mysql_db)
mysql_cursor = mysql_con.cursor()
client = palo_client.PaloClient(config.fe_host, config.fe_query_port, user=config.fe_user, \
password=config.fe_password)
client.init()
def check_partition_list(table_family_group_name, partition_name_list):
"""
验证分区是否创建成功
"""
for partition_name in partition_name_list:
print("--------------------")
print(table_family_group_name)
print("-------------------")
ret = client.get_partition(table_family_group_name, partition_name)
assert ret
def create_table_family(table_family_group_name, column_name,
partition_name_list, partition_value_list, partition_name, key_type):
"""
todo
"""
partition_info = palo_client.PartitionInfo(partition_name,
partition_name_list, partition_value_list)
distribution_info = palo_client.DistributionInfo(distribution_type, bucket_num)
client.create_table(table_family_group_name, column_name,
partition_info, distribution_info, keys_desc=key_type)
assert client.show_tables(table_family_group_name)
check_partition_list(table_family_group_name, partition_name_list)
def check(line1, line2):
"""
todo
"""
print(line1)
print(line2)
palo_result = client.execute(line1)
mysql_cursor.execute(line2)
mysql_result = mysql_cursor.fetchall()
util.check(palo_result, mysql_result)
def init_data(data_file, database_name, table_family_name,
partition_name_list, partition_value_list, partition_name):
"""
small_load and check
清除,建立数据库,建表,导入,导入完成后判断是否导入成功
"""
for key_type in [None, duplicate_key, unique_key]:
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
if key_type is None:
column_type_list = column_list
else:
column_type_list = column_no_agg_list
create_table_family(table_family_name, column_type_list, partition_name_list,
partition_value_list, partition_name, key_type)
sub_partitio_list = partition_name_list[0:2]
ret = client.stream_load(table_family_name, data_file, partition_list=sub_partitio_list,
max_filter_ratio=0.65, column_name_list=DATA.column_name_list)
assert ret
line1 = "select k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11\
from %s order by k1" % table_family_name
line2 = "select k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11 \
from baseall where %s < \"%s\" order by k1" % (partition_name,
partition_value_list[1])
check(line1, line2)
client.clean(database_name)
def test_type_k1():
"""
{
"title": "测试建表时,各个不同分区的导入, 使用k1建立分区,并导入",
"describe": "describe",
"tag": "function,p1,fuzz"
}
"""
"""
测试建表时,各个不同分区的导入
使用k1,k2,k3,k4,k10,k11建立分区,并导入
"""
database_name, table_family_name, rollup_table_name = util.gen_name_list()
data_file = "%s/data/small_load_simple/test_small_load_base.data" % file_dir
partition_name_list = ['partition_a', 'partition_b', 'partition_c']
partition_value_list = ['5', '10', 'MAXVALUE']
partition_name = "k1"
print("***********************")
print(partition_name)
init_data(data_file, database_name, table_family_name, partition_name_list,
partition_value_list, partition_name)
def test_type_k2():
"""
{
"title": "test_stream_partition.test_type_k2",
"describe": "测试建表时,各个不同分区的导入, 使用k2建立分区,并导入",
"tag": "function,p1,fuzz"
}
"""
"""
测试建表时,各个不同分区的导入
使用k1,k2,k3,k4,k10,k11建立分区,并导入
"""
database_name, table_family_name, rollup_table_name = util.gen_name_list()
data_file = "%s/data/small_load_simple/test_small_load_base.data" % file_dir
partition_name_list = ['partition_a', 'partition_b', 'partition_c']
partition_value_list = ['0', '2000', 'MAXVALUE']
partition_name = "k2"
print("***********************")
print(partition_name)
init_data(data_file, database_name, table_family_name, partition_name_list,
partition_value_list, partition_name)
def test_type_k3():
"""
{
"title": "test_stream_partition.test_type_k3",
"describe": "测试建表时,各个不同分区的导入, 使用k3建立分区,并导入",
"tag": "function,p1,fuzz"
}
"""
"""
测试建表时,各个不同分区的导入
使用k1,k2,k3,k4,k10,k11建立分区,并导入
"""
database_name, table_family_name, rollup_table_name = util.gen_name_list()
data_file = "%s/data/small_load_simple/test_small_load_base.data" % file_dir
partition_name_list = ['partition_a', 'partition_b', 'partition_c']
partition_value_list = ['0', '2000', 'MAXVALUE']
partition_name = "k3"
print("***********************")
print(partition_name)
init_data(data_file, database_name, table_family_name, partition_name_list,
partition_value_list, partition_name)
def test_type_k4():
"""
{
"title": "test_stream_partition.test_type_k4",
"describe": "测试建表时,各个不同分区的导入, 使用k4建立分区,并导入",
"tag": "function,p1,fuzz"
}
"""
"""
测试建表时,各个不同分区的导入
使用k1,k2,k3,k4,k10,k11建立分区,并导入
"""
database_name, table_family_name, rollup_table_name = util.gen_name_list()
data_file = "%s/data/small_load_simple/test_small_load_base.data" % file_dir
partition_name_list = ['partition_a', 'partition_b', 'partition_c']
partition_value_list = ['0', '11011905', 'MAXVALUE']
partition_name = "k4"
print("***********************")
print(partition_name)
init_data(data_file, database_name, table_family_name, partition_name_list,
partition_value_list, partition_name)
def test_type_k10():
"""
{
"title": "test_stream_partition.test_type_k10",
"describe": "测试建表时,各个不同分区的导入, 使用k10建立分区,并导入",
"tag": "function,p1,fuzz"
}
"""
"""
测试建表时,各个不同分区的导入
使用k1,k2,k3,k4,k10,k11建立分区,并导入
"""
database_name, table_family_name, rollup_table_name = util.gen_name_list()
data_file = "%s/data/small_load_simple/test_small_load_base.data" % file_dir
partition_name_list = ['partition_a', 'partition_b', 'partition_c']
partition_value_list = ['1910-01-01', '2012-03-14', 'MAXVALUE']
partition_name = "k10"
print("***********************")
print(partition_name)
init_data(data_file, database_name, table_family_name, partition_name_list,
partition_value_list, partition_name)
def test_type_k11():
"""
{
"title": "test_type_k11",
"describe": "测试建表时,各个不同分区的导入, 使用k11建立分区,并导入",
"tag": "function,p1,fuzz"
}
"""
"""
测试建表时,各个不同分区的导入
使用k1,k2,k3,k4,k10,k11建立分区,并导入
"""
database_name, table_family_name, rollup_table_name = util.gen_name_list()
data_file = "%s/data/small_load_simple/test_small_load_base.data" % file_dir
partition_name_list = ['partition_a', 'partition_b', 'partition_c']
partition_value_list = ['1910-01-01 00:00:00', '2012-03-14 00:00:00', 'MAXVALUE']
partition_name = "k11"
print("***********************")
print(partition_name)
init_data(data_file, database_name, table_family_name, partition_name_list,
partition_value_list, partition_name)
client.clean(database_name)
def teardown_module():
"""
tearDown
"""
pass
if __name__ == '__main__':
setup_module()
test_type_k1()