blob: 4f1c2831f32bd898e227378dc7a1f2193526aec4 [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
############################################################################
#
# @file test_sys_storage_medium.py
# @date 2020-05-07 14:48:30
# @brief 存储介质基本case。需要使用env_config.py中的be数据目录及机器登录信息
#
#############################################################################
"""
分级存储测试
"""
import time
import datetime
import pytest
from data import partition as DATA
from data import schema as SCHEMA
from lib import palo_config
from lib import palo_client
from lib import palo_job
from lib import util
config = palo_config.config
LOG = palo_client.LOG
L = palo_client.L
broker_info = palo_config.broker_info
BE_DATA_PATH_EXISTS=False
def setup_module():
"""
setUp
"""
global client, BE_DATA_PATH_EXISTS
client = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user, \
password=config.fe_password)
be_host = client.get_backend_host_list()
for host in be_host:
cmd = 'ls %s' % ' '.join(config.be_data_path)
status, output = util.exec_cmd(cmd, config.host_username, config.host_password, host)
if status == 0:
BE_DATA_PATH_EXISTS = True
def check_tablet(table_tablet_info, storage_medium):
"""
检查tablet是否在be数据目录的ssd中
table_tablet_info:show tablet result
"""
if not BE_DATA_PATH_EXISTS:
return 0
storage_path_list = list()
for data_path in config.be_data_path:
if data_path.endswith(storage_medium.upper()):
storage_path_list.append(data_path + '/data/')
for tablet in table_tablet_info:
tablet_id = tablet[0]
host = client.get_backend(tablet[2])[palo_job.BackendProcInfo.Host]
for data_path in storage_path_list:
cmd = 'find %s -type d -name %s ' % (data_path, tablet_id)
status, output = util.exec_cmd(cmd, config.host_username, config.host_password, host)
print(cmd)
print(status, output)
LOG.info(L('exec cmd on remote host', cmd=cmd, username=config.host_username, host=host))
LOG.info(L('exec cmd on remote host ret', status=status, output=output))
assert status == 0
assert output.find(tablet_id) != -1
def test_ssd_all():
"""
{
"title": "存储介质SSD测试",
"describe": "复合分区表建表时指定全部分区到SSD,验证BE在SSD上建立了存储分片,导入数据后, 验证数据正确分布到了SSD,并对数据进行正确性验证",
"tag": "p1,system"
}
"""
"""
复合分区表建表时指定全部分区到SSD
验证BE在SSD上建立了存储分片
导入数据后, 验证数据正确分布到了SSD,并对数据进行正确性验证
"""
database_name, table_name, index_name = util.gen_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='SSD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='SSD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='SSD')
partition_d = palo_client.Partition('partition_d', 'MAXVALUE', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1, partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
ret = client.show_tablet(table_name)
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'ssd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
client.clean(database_name)
def test_hdd_all():
"""
{
"title": "建表指定HDD",
"describe": "复合分区表建表时指定全部分区到HDD,验证BE在HDD正确建立了存储分片,导入数据后,验证相应分区的数据正确分布到了HDD,并对数据进行正确性验证",
"tag": "p1,system"
}
"""
"""
复合分区表建表时指定全部分区到HDD
验证BE在HDD正确建立了存储分片
导入数据后,验证相应分区的数据正确分布到了HDD,并对数据进行正确性验证
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', 'MAXVALUE', storage_medium='HDD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k2)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
ret = client.show_tablet(table_name)
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
client.clean(database_name)
def test_ssd_hdd():
"""
{
"title": "建分区表,ssd和hdd混合",
"describe": "复合分区表建表时指定部分分区到SSD,指定其余分区到HDD",
"tag": "p1,system"
}
"""
"""
复合分区表建表时指定部分分区到SSD,指定其余分区到HDD
并且保证所有BE的SSD和HDD空间充足
验证BE在SSD和HDD建立了存储分片
导入数据后,验证相应分区的数据正确分布到了SSD和HDD,并对数据进行正确性验证
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='SSD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='SSD')
partition_d = palo_client.Partition('partition_d', 'MAXVALUE', storage_medium='HDD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k3)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
ret = client.show_tablet(table_name, partition_list=['partition_a', 'partition_c'])
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'ssd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
ret = client.show_tablet(table_name, partition_list=['partition_b', 'partition_d'])
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
client.clean(database_name)
def test_ssd_table():
"""
{
"title": "建立ssd单分区表",
"describe": "单分区表建表时指定到SSD,验证BE在SSD上建立了存储分片,导入数据后,验证数据正确分布到了SSD,并对数据进行正确性验证",
"tag": "p1,system"
}
"""
"""
单分区表建表时指定到SSD,
验证BE在SSD上建立了存储分片
导入数据后,验证数据正确分布到了SSD,并对数据进行正确性验证
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1,
distribution_info=distribution_info, storage_medium='SSD')
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, table_name) == 'SSD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
ret = client.show_tablet(table_name)
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'ssd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_hdd_table():
"""
{
"title": "建立hdd单分区表",
"describe": "单分区表建表时指定到HDD,验证BE在HDD上建立了存储分片,导入数据后,验证数据正确分布到了HDD,并对数据进行正确性验证",
"tag": "p1,system"
}
"""
"""
单分区表建表时指定到HDD,
验证BE在HDD上建立了存储分片
导入数据后,验证数据正确分布到了HDD,并对数据进行正确性验证
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1,
distribution_info=distribution_info, storage_medium='HDD')
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, table_name) == 'HDD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
ret = client.show_tablet(table_name)
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
client.clean(database_name)
def test_add_ssd_partition():
"""
{
"title": "增加ssd分区",
"describe": "复合分区表,增加分区指定到SSD,同时修改BUCKETS个数,验证BE在SSD上建立了新的分区的存储分片,导入新增分区的数据后,验证数据分布到SSD,正确性验证",
"tag": "p1,system"
}
"""
"""
复合分区表,增加分区指定到SSD,同时修改BUCKETS个数
验证BE在SSD上建立了新的分区的存储分片
导入新增分区的数据后,验证数据正确分布到了SSD,并对数据进行正确性验证
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='HDD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k2)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
ret = client.show_tablet(table_name)
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
assert client.add_partition(table_name, 'partition_add', 'MAXVALUE',
storage_medium='SSD', distribute_type='Hash(k2)',
bucket_num=13)
assert client.get_partition_storage_medium(table_name, 'partition_add') == 'SSD'
ret = client.show_tablet(table_name, partition_list=['partition_add'])
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'ssd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_add_hdd_partition():
"""
{
"title": "增加hdd分区",
"describe": "复合分区表,增加分区指定到HDD,同时修改BUCKETS个数,验证BE在SSD上建立了新的分区的存储分片,导入新增分区的数据后,验证数据正确分布到了HDD,并对数据进行正确性验证",
"tag": "autotest"
}
"""
"""
复合分区表,增加分区指定到HDD,同时修改BUCKETS个数
验证BE在SSD上建立了新的分区的存储分片
导入新增分区的数据后,验证数据正确分布到了HDD,并对数据进行正确性验证
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='SSD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='SSD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='SSD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 1)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
ret = client.show_tablet(table_name)
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'ssd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
assert client.add_partition(table_name, 'partition_add', 'MAXVALUE',
storage_medium='HDD', distribute_type='Hash(k1)', bucket_num=13)
assert client.get_partition_storage_medium(table_name, 'partition_add') == 'HDD'
ret = client.show_tablet(table_name, partition_list=['partition_add'])
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
time.sleep(10)
print(e)
if retry_times == 0:
assert 0 == 1, 'storage medium check error'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_modify_to_hdd_partition():
"""
{
"title": "修改ssd分区为hdd",
"describe": "主动修改分区存储属性,SSD改为HDD,HDD充足,验证BE汇报后迁移正确,数据正确",
"tag": "p1,system"
}
"""
"""
主动修改分区存储属性,SSD改为HDD,HDD充足,验证BE汇报后迁移正确,数据正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k3)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
assert client.modify_partition(table_name, 'partition_d', storage_medium='HDD')
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
ret = client.show_tablet(table_name, partition_list=['partition_d'])
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_modify_to_ssd_partition():
"""
{
"title": "修改hdd分区为ssd",
"describe": "主动修改分区存储属性,HDD改为SSD,SSD充足,验证BE汇报后迁移正确,数据正确",
"tag": "p1,system"
}
"""
"""
主动修改分区存储属性,HDD改为SSD,SSD充足,验证BE汇报后迁移正确,数据正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='SSD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='SSD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='SSD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='HDD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k2)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
ret = client.show_tablet(table_name, partition_list=['partition_d'])
check_tablet(ret, 'hdd')
assert client.modify_partition(table_name, 'partition_d', storage_medium='SSD')
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
ret = client.show_tablet(table_name, partition_list=['partition_d'])
retry_times = 10
while retry_times > 0:
try:
retry_times -= 1
time.sleep(30)
check_tablet(ret, 'ssd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_modify_ssd_to_ssd_partition():
"""
{
"title": "将ssd分区改为ssd",
"describe": "主动修改分区存储属性,SSD改为SSD,验证不迁移",
"tag": "p1,system,fuzz"
}
"""
"""
主动修改分区存储属性,SSD改为SSD,验证不迁移
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='SSD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='SSD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='SSD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'SSD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
assert client.modify_partition(table_name, 'partition_d', storage_medium='SSD')
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
ret = client.show_tablet(table_name, partition_list=['partition_d'])
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'ssd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from ssd')
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_modify_hdd_to_hdd_partition():
"""
{
"title": "修改hdd分区为hdd",
"describe": "主动修改分区存储属性,HDD改为HDD,验证不迁移",
"tag": "p1,system,fuzz"
}
"""
"""
主动修改分区存储属性,HDD改为HDD,验证不迁移
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='HDD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 2)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
assert client.modify_partition(table_name, 'partition_d', storage_medium='HDD')
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
ret = client.show_tablet(table_name, partition_list=['partition_d'])
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_modify_to_hdd_partition_at_cooldown_time():
"""
{
"title": "修改hdd分区冷却时间",
"describe": "主动修改分区storage_cooldown_time属性,HDD充足,验证属性修改正确,验证过期时间达到后BE汇报后迁移正确",
"tag": "autotest"
}
"""
"""
主动修改分区storage_cooldown_time属性,HDD充足
验证属性修改正确,验证过期时间达到后BE汇报后迁移正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k3)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
storage_cooldown_delta_time = 150
delta = datetime.timedelta(seconds=storage_cooldown_delta_time)
storage_cooldown_time = (datetime.datetime.now() + delta).strftime('%Y-%m-%d %H:%M:%S')
assert client.modify_partition(table_name, 'partition_d', storage_medium='SSD',
storage_cooldown_time=storage_cooldown_time)
time.sleep(storage_cooldown_delta_time + 30)
ret = client.show_tablet(table_name, partition_list=['partition_d'])
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_modify_medium_cooldown_time():
"""
{
"title": "修改hdd分区为ssd,增加冷却时间",
"describe": "单分区表同时修改分区storage_medium、storage_cooldown_time属性,HDD改为SSD,SSD充足,验证属性修改正确,验证迁移正确",
"tag": "autotest"
}
"""
"""
单分区表同时修改分区storage_medium、storage_cooldown_time属性
HDD改为SSD,SSD充足,验证属性修改正确,验证迁移正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1,
distribution_info=distribution_info, storage_medium='HDD')
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, table_name) == 'HDD'
storage_cooldown_delta_time = 150
delta = datetime.timedelta(seconds=storage_cooldown_delta_time)
storage_cooldown_time = (datetime.datetime.now() + delta).strftime('%Y-%m-%d %H:%M:%S')
assert client.modify_partition(table_name, table_name, storage_medium='SSD',
storage_cooldown_time=storage_cooldown_time)
time.sleep(storage_cooldown_delta_time + 30)
ret = client.show_tablet(table_name)
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
assert client.get_partition_storage_medium(table_name, table_name) == 'HDD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_modify_medium_cooldown_time_for_partitions():
"""
{
"title": "修改分区为ssd,修改冷却时间",
"describe": "复合分区表对多个分区同时修改分区storage_medium、storage_cooldown_time属性,HDD改为SSD,SSD充足,验证属性修改正确,验证迁移正确",
"tag": "p1,system"
}
"""
"""
复合分区表对多个分区同时修改分区storage_medium、storage_cooldown_time属性
HDD改为SSD,SSD充足,验证属性修改正确,验证迁移正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k3)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
storage_cooldown_delta_time = 150
delta = datetime.timedelta(seconds=storage_cooldown_delta_time)
storage_cooldown_time = (datetime.datetime.now() + delta).strftime('%Y-%m-%d %H:%M:%S')
assert client.modify_partition(table_name, 'partition_c', storage_medium='SSD',
storage_cooldown_time=storage_cooldown_time)
assert client.modify_partition(table_name, 'partition_d', storage_medium='SSD',
storage_cooldown_time=storage_cooldown_time)
time.sleep(storage_cooldown_delta_time + 30)
ret = client.show_tablet(table_name)
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_modify_cooldown_time():
"""
{
"title": "单分区表增加冷却时间",
"describe": "单分区表SSD指定过期时间,验证过期时间达到后BE汇报后数据迁移到HDD,验证数据正确",
"tag": "autotest"
}
"""
"""
单分区表SSD指定过期时间,验证过期时间达到后BE汇报后数据迁移到HDD,验证数据正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1,
distribution_info=distribution_info, storage_medium='SSD')
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, table_name) == 'SSD'
storage_cooldown_delta_time = 150
delta = datetime.timedelta(seconds=storage_cooldown_delta_time)
storage_cooldown_time = (datetime.datetime.now() + delta).strftime('%Y-%m-%d %H:%M:%S')
assert client.modify_partition(table_name, table_name, storage_medium='SSD',
storage_cooldown_time=storage_cooldown_time)
time.sleep(storage_cooldown_delta_time + 30)
ret = client.show_tablet(table_name)
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'hdd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
assert client.get_partition_storage_medium(table_name, table_name) == 'HDD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_modify_invalid_cooldown_time_a():
"""
{
"title": "无效的冷却时间",
"describe": "单分区表SSD指定过去时间为过期时间,验证不迁移,报错",
"tag": "p1,system,fuzz"
}
"""
"""
单分区表SSD指定过去时间为过期时间,验证不迁移,报错
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1,
distribution_info=distribution_info, storage_medium='SSD')
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, table_name) == 'SSD'
storage_cooldown_delta_time = 150
delta = datetime.timedelta(seconds=storage_cooldown_delta_time)
storage_cooldown_time = (datetime.datetime.now() - delta).strftime('%Y-%m-%d %H:%M:%S')
ret = False
try:
ret = client.modify_partition(table_name, table_name, storage_medium='SSD',
storage_cooldown_time=storage_cooldown_time)
except:
pass
assert not ret
client.clean(database_name)
def test_modify_invalid_cooldown_time_b():
"""
{
"title": "无效的冷却时间",
"describe": "单分区表SSD指定当前时间为过期时间,验证报错",
"tag": "p1,system,fuzz"
}
"""
"""
单分区表SSD指定当前时间为过期时间,验证报错
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1,
distribution_info=distribution_info, storage_medium='SSD')
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, table_name) == 'SSD'
storage_cooldown_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
ret = False
try:
ret = client.modify_partition(table_name, table_name, storage_medium='SSD',
storage_cooldown_time=storage_cooldown_time)
except:
pass
assert not ret
client.clean(database_name)
def test_cooldown_while_select():
"""
{
"title": "test_cooldown_while_select",
"describe": "复合分区表全部分区指定到SSD,部分分区指定过期时间,部分分区不指定过期时间过期时间达到前后1分钟内,持续进行查询操作",
"tag": "p1,system,stability"
}
"""
"""
复合分区表全部分区指定到SSD,SSD充足
部分分区指定过期时间,部分分区不指定过期时间
过期时间达到前后1分钟内,持续进行查询操作
验证查询结果一直正确,验证指定过期时间的分区的数据迁移到HDD
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
storage_cooldown_delta_time = 5 * 60
delta = datetime.timedelta(seconds=storage_cooldown_delta_time)
storage_cooldown_time = (datetime.datetime.now() + delta).strftime('%Y-%m-%d %H:%M:%S')
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.verify(DATA.expected_data_file_list_1, table_name)
storage_cooldown_delta_time = 150
delta = datetime.timedelta(seconds=storage_cooldown_delta_time)
storage_cooldown = (datetime.datetime.now() + delta)
storage_cooldown_time = storage_cooldown.strftime('%Y-%m-%d %H:%M:%S')
assert client.modify_partition(table_name, 'partition_a', storage_medium='SSD',
storage_cooldown_time=storage_cooldown_time)
assert client.modify_partition(table_name, 'partition_c', storage_medium='SSD',
storage_cooldown_time=storage_cooldown_time)
while datetime.datetime.now() < storage_cooldown + delta:
assert client.verify(DATA.expected_data_file_list_1, table_name)
time.sleep(5)
ret = client.show_tablet(table_name, partition_list=['partition_a', 'partition_c'])
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'ssd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
assert client.verify(DATA.expected_data_file_list_1, table_name)
client.clean(database_name)
def test_migrate_while_loading():
"""
{
"title": "test_migrate_while_loading",
"describe": "迁移过程的同时导入,验证迁移正确,验证导入正确",
"tag": "p1,system,stability"
}
"""
"""
迁移过程的同时导入,验证迁移正确,验证导入正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k1)', 3)
assert client.create_table(table_name, DATA.schema_1,
partition_info, distribution_info)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
label = util.get_label()
assert client.batch_load(label, data_desc_list, broker=broker_info)
state = client.get_load_job_state(label)
while state != 'LOADING':
if state == 'FINISHED' or state == 'CANCELLED':
raise pytest.skip('can not get loading state')
time.sleep(1)
state = client.get_load_job_state(label)
assert client.modify_partition(table_name, 'partition_d', storage_medium='HDD')
time.sleep(30 * 2)
assert client.verify(DATA.expected_data_file_list_1, table_name)
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
client.clean(database_name)
def test_migrate_before_schema_change():
"""
{
"title": "test_migrate_before_schema_change",
"describe": "迁移过程的同时schema change,验证迁移正确,验证schema change正确",
"tag": "p1,system,stability"
}
"""
"""
迁移过程的同时schema change,验证迁移正确,验证schema change正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k2)', 3)
keys_desc = 'DUPLICATE KEY(k1,k2,k3,k4,k5)'
assert client.create_table(table_name, SCHEMA.partition_column_no_agg_list,
partition_info, distribution_info, keys_desc=keys_desc)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert client.modify_partition(table_name, 'partition_d', storage_medium='HDD')
column_name_list = ['k1',]
assert client.schema_change_drop_column(table_name, column_name_list, is_wait_job=True)
assert client.verify(DATA.expected_data_file_list_3, table_name)
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
ret = client.show_tablet(table_name, partition_list=['partition_d'])
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'ssd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
client.clean(database_name)
def test_migrate_after_schema_change():
"""
{
"title": "test_migrate_after_schema_change",
"describe": "schema change的同时迁移,验证迁移正确,验证schema change正确",
"tag": "p1,system,stability"
}
"""
"""
schema change的同时迁移,验证迁移正确,验证schema change正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name,
table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
partition_a = palo_client.Partition('partition_a', '100', storage_medium='HDD')
partition_b = palo_client.Partition('partition_b', '200', storage_medium='HDD')
partition_c = palo_client.Partition('partition_c', '300', storage_medium='HDD')
partition_d = palo_client.Partition('partition_d', '50000', storage_medium='SSD')
partition_list = [partition_a, partition_b, partition_c, partition_d]
partition_info = palo_client.PartitionInfo('k3', partition_list=partition_list)
distribution_info = palo_client.DistributionInfo('Hash(k2)', 3)
keys_desc = 'DUPLICATE KEY(k1,k2,k3,k4,k5)'
assert client.create_table(table_name, SCHEMA.partition_column_no_agg_list,
partition_info, distribution_info, keys_desc=keys_desc)
assert client.show_tables(table_name)
assert client.get_index(table_name)
assert client.get_partition_storage_medium(table_name, 'partition_a') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_b') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_c') == 'HDD'
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'SSD'
data_desc_list = palo_client.LoadDataInfo(DATA.file_path_1, table_name)
assert client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
column_name_list = ['k1',]
assert client.schema_change_drop_column(table_name, column_name_list,
is_wait_job=True, is_wait_delete_old_schema=True)
assert client.modify_partition(table_name, 'partition_d', storage_medium='HDD')
time.sleep(60)
assert client.verify(DATA.expected_data_file_list_3, table_name)
assert client.get_partition_storage_medium(table_name, 'partition_d') == 'HDD'
ret = client.show_tablet(table_name, partition_list=['partition_d'])
retry_times = 10
while retry_times > 0:
try:
time.sleep(30)
retry_times -= 1
check_tablet(ret, 'ssd')
break
except Exception as e:
print(e)
if retry_times == 0:
raise pytest.skip('timeout, cannot get tablet from hdd')
client.clean(database_name)
if __name__ == '__main__':
setup_module()
# test_ssd_all()
# test_hdd_all()
# test_ssd_table()
# test_hdd_table()
# test_ssd_hdd()
# test_add_ssd_partition()
# test_add_hdd_partition()
# test_modify_to_hdd_partition()
# test_modify_to_ssd_partition()
# test_modify_ssd_to_ssd_partition()
# test_modify_hdd_to_hdd_partition()
# test_modify_to_hdd_partition_at_cooldown_time()
# test_modify_medium_cooldown_time()
# test_modify_medium_cooldown_time_for_partitions()
# test_modify_cooldown_time()
# test_modify_invalid_cooldown_time_a()
# test_modify_invalid_cooldown_time_b()
# test_cooldown_while_select()
# test_migrate_while_loading()
test_migrate_before_schema_change()
test_migrate_after_schema_change()