blob: 9f97930e2edf06830ef634dfc02062c8f945be0c [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
############################################################################
#
# @file test_sys_resource_tag.py
# @date 2021-11-02 15:18:00
# @brief This file is a test file for resource tag
#
#############################################################################
"""
resource tag 测试,因修改了be节点标签,会导致默认副本建表失败,需单独运行测试
"""
import sys
import time
sys.path.append("../")
from data import schema
from data import load_file
from lib import palo_config
from lib import palo_client
from lib import palo_job
from lib import util
from lib import common
from lib import node_op
client = None
config = palo_config.config
LOG = palo_client.LOG
L = palo_client.L
def setup_module():
"""
setUp
"""
global client
global host_list
global port
global node_operator
node_operator = node_op.Node()
client = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user,
password=config.fe_password, http_port=config.fe_http_port)
ret = client.get_backend_list()
port = util.get_attr(ret, palo_job.BackendProcInfo.HeartbeatPort)[0]
try:
resource_tag_initialize(node_operator.get_be_ip_list(), port)
except Exception as e:
LOG.info(L('init resource tag error', msg=str(e)))
print(e)
be_list = util.get_attr_condition_list(ret, palo_job.BackendProcInfo.Alive,
'false', palo_job.BackendProcInfo.Host)
if be_list is not None:
for be_host in be_list:
node_operator.start_be(be_host)
assert node_operator.is_be_alive(be_host)
host_list = client.get_backend_host_ip()
def resource_tag_initialize(host_name_list, port):
"""初始化设置"""
time.sleep(2)
be_host_list = client.get_backend_host_ip()
for host_name in host_name_list:
if host_name not in be_host_list:
try:
client.add_backend(host_name, port)
except Exception as e:
LOG.info(L('add backend fail.', backend=host_name, port=port, msg=str(e)))
tag = client.get_resource_tag(host_name)
if tag != 'default':
client.modify_resource_tag(host_name, port, "default")
client.set_properties("'resource_tags.location'= ''", user='root')
client.set_variables('exec_mem_limit', '2G', is_global=True)
time.sleep(2)
def check_resource_tag(host_ip_list, tag_location_list):
"""验证be标签是否设置正确"""
if not isinstance(host_ip_list, list):
host_list = [host_ip_list,]
if not isinstance(tag_location_list, list):
tag_location_list = [tag_location_list,]
check_tag_list = []
for i in range(len(host_list)):
check_tag_list.append(client.get_resource_tag(host_list[i]))
assert check_tag_list == tag_location_list, \
"backend resource tag is incorrect, expect %s, actural %s" % (tag_location_list, check_tag_list)
def check_resource_tag_by_id(backend_id_list, tag_location_list):
"""验证be标签是否设置正确"""
if not isinstance(backend_id_list, list):
host_name_list = [backend_id_list,]
if not isinstance(tag_location_list, list):
tag_location_list = [tag_location_list,]
check_tag_list = []
for i in range(len(backend_id_list)):
check_tag_list.append(client.get_resource_tag_by_id(backend_id_list[i]))
assert check_tag_list.sort() == tag_location_list.sort(), "backend resource tag is incorrect"
def host_port(host, port):
"""生成格式'host:port'"""
return host + ":" + port
def create(table_name, replication_allocation=None, replication_num=None, column_list=None, \
partition_info=None, keys_desc=None):
"""建表"""
if not column_list:
column_list = schema.baseall_column_list
distribution_info = palo_client.DistributionInfo('HASH(k1)', 1)
client.create_table(table_name, column_list, partition_info, distribution_info, replication_num=replication_num, \
keys_desc=keys_desc, replication_allocation=replication_allocation)
assert client.show_tables(table_name), 'create table failed'
def check_data(table_name):
"""导入查询验证"""
assert client.stream_load(table_name, load_file.baseall_local_file)
sql_1 = 'select * from %s order by k1' % table_name
sql_2 = 'select * from test_query_qa.baseall order by k1'
util.check(client.execute(sql_1), client.execute(sql_2), True)
def check_data_2(table_name):
"""用于修改标签后的二次导入查询验证"""
column = ["c1,c2,c3,c4,c5,c6,c10,c11,c7,c8,c9,k1=c1+15,k2=c2,k3=c3,k4=c4,k5=c5,k6=c6,k10=c10,k11=c11,k7=c7,\
k8=c8,k9=c9"]
assert client.stream_load(table_name, load_file.baseall_local_file, column_name_list=column)
sql_1 = 'select * from %s order by k1' % table_name
sql_2 = 'select * from (select * from test_query_qa.baseall union select k1+15 as k1,k2,k3,k4,k5,k6,k10,k11,k7,\
k8,k9 from test_query_qa.baseall)a order by k1'
util.check(client.execute(sql_1), client.execute(sql_2), True)
def wait_replica_transfer(table_name, backend_id):
"""等待副本完成迁移"""
time_limit = 200
LOG.info(L('WAIT TRANSFER', backend_id=backend_id))
client.admin_repair_table(table_name)
while time_limit > 0:
new_backend_id = client.get_replica_backend_id(table_name)
if new_backend_id != backend_id:
LOG.info(L('TRANSFER RED', backend_id=new_backend_id))
return True
time_limit -= 1
time.sleep(3)
LOG.info(L('TRANSFER RET', backend_id=new_backend_id))
LOG.info(L('REPLICA TRANSFER FAIL:TIMEOUT', backend_id=backend_id))
return False
def test_add_tag():
"""
{
"title": "test_sys_resource_tag:test_add_tag",
"describe": "drop be节点后,add be节点并设置标签和默认标签",
"tag": "system,p0"
}
"""
#add tag
resource_tag_initialize(host_list, port)
for host in host_list:
client.drop_backend_list(host_port(host, port))
client.add_backend(host, port, "group_a")
check_resource_tag(host, "group_a")
#add default tag
for host in host_list:
client.drop_backend_list(host_port(host, port))
client.add_backend(host, port, "default")
check_resource_tag(host, "default")
resource_tag_initialize(host_list, port)
def test_add_tag_twice():
"""
{
"title": "test_sys_resource_tag:test_add_tag_twice",
"describe": "drop be节点后,add be节点并设置标签,设置相同标签再次add该节点失败",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
host = host_list[0]
client.drop_backend_list(host_port(host, port))
client.add_backend(host, port, "group_a")
check_resource_tag(host, "group_a")
util.assert_return(False, 'Same backend already exists', client.add_backend, host, port, "group_a")
resource_tag_initialize(host_list, port)
def test_add_two_tags():
"""
{
"title": "test_sys_resource_tag:test_add_two_tags",
"describe": "drop be节点后,add be节点并设置两个标签,be节点添加成功",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
host = host_list[0]
client.drop_backend_list(host_port(host, port))
util.assert_return(False, 'Invalid tag', client.add_backend, host, port, "group_a,group_b")
resource_tag_initialize(host_list, port)
resource_tag_initialize(host_list, port)
def test_incorrect_host_port():
"""
{
"title": "test_sys_resource_tag:test_incorrect_host_port",
"describe": "drop be节点后,add be节点并设置标签,设置的be host和port错误,be节点添加失败",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
#incorrect host
host = '0.0.0.0'
client.add_backend(host, port, "group_a")
backend_list = client.get_backend_list()
for backend in backend_list:
if palo_job.BackendProcInfo(backend).get_ip() == host:
assert backend[palo_job.BackendProcInfo.Alive] == 'false'
client.drop_backend_list(host_port(host, port))
#incorrect port
incorrect_port = '11000'
host = host_list[0]
client.add_backend(host, incorrect_port, "group_a")
backend_list = client.get_backend_list()
for backend in backend_list:
if backend[palo_job.BackendProcInfo.HeartbeatPort] == incorrect_port:
assert backend[palo_job.BackendProcInfo.Alive] == 'false'
client.drop_backend_list(host_port(host, incorrect_port))
resource_tag_initialize(host_list, port)
def test_tag_format():
"""
{
"title": "test_sys_resource_tag:test_tag_format",
"describe": "drop be节点,add be节点并设置标签,验证标签格式,仅支持小写字母、数字和下划线,且必须为小写字母开头",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
host = host_list[0]
#good tag format
tag_list = ['a', 'a_', 'a1', 'abcd_efgh_abcd_efgh_abcd_efgh_abc', 'aA']
for tag in tag_list:
client.drop_backend_list(host_port(host, port))
client.add_backend(host, port, tag)
check_resource_tag(host, tag)
tag_list = ['_a', '1a', 'a*', '', ' ', 'abcd_efgh_abcd_efgh_abcd_efgh_abcd']
for tag in tag_list:
util.assert_return(False, 'Invalid tag', client.add_backend, host, port, tag)
client.drop_backend_list(host_port(host, port))
client.add_backend(host, port, 'default')
check_resource_tag(host, 'default')
client.drop_backend_list(host_port(host, port))
def test_modify_tag():
"""
{
"title": "test_sys_resource_tag:test_modify_tag",
"describe": "修改be节点标签,再修改be节点为默认标签",
"tag": "system,p0"
}
"""
resource_tag_initialize(host_list, port)
for host in host_list:
client.modify_resource_tag(host, port, 'group_a')
check_resource_tag(host, 'group_a')
#default tag
for host in host_list:
client.modify_resource_tag(host, port, 'default')
check_resource_tag(host, 'default')
resource_tag_initialize(host_list, port)
def test_modify_tag_format():
"""
{
"title": "test_sys_resource_tag:test_modify_tag",
"describe": "修改be节点标签,验证标签格式,仅支持小写字母、数字和下划线,且必须为小写字母开头",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
host = host_list[0]
#good tag format
tag_list = ['a', 'a_', 'a1', 'abcd_efgh_abcd_efgh_abcd_efgh_abc', 'aA']
for tag in tag_list:
client.modify_resource_tag(host, port, tag)
check_resource_tag(host, tag)
# bad tag format
tag_list = ['_a', '1a', 'a*', '', ' ', 'abcd_efgh_abcd_efgh_abcd_efgh_abcd']
for tag in tag_list:
util.assert_return(False, 'Invalid tag', client.modify_resource_tag, host, port, tag)
client.modify_resource_tag(host, port, 'default')
check_resource_tag(host, 'default')
def test_modify_tag_incorrect_host_port():
"""
{
"title": "test_sys_resource_tag:test_modify_tag_incorrect_host_port",
"describe": "修改be节点标签,be节点的host和port配置错误,修改标签失败",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
#incorrect host
host = '0.0.0.0'
util.assert_return(False, 'backend does not exists', client.modify_resource_tag, host, port, 'group_a')
#incorrect port
incorrect_port = '11000'
host = host_list[0]
util.assert_return(False, 'backend does not exists', client.modify_resource_tag, host, incorrect_port, 'group_a')
resource_tag_initialize(host_list, port)
def test_modify_tag_after_drop():
"""
{
"title": "test_sys_resource_tag:test_modify_tag_after_drop",
"describe": "drop be节点后修改be节点标签,修改标签失败",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
host = host_list[0]
client.drop_backend_list(host_port(host, port))
util.assert_return(False, 'backend does not exists', client.modify_resource_tag, host, port, 'group_a')
resource_tag_initialize(host_list, port)
def test_modify_same_tag():
"""
{
"title": "test_sys_resource_tag:test_modify_same_tag",
"describe": "修改be节点标签与原标签相同,修改标签成功",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
for host in host_list:
client.modify_resource_tag(host, port, 'group_a')
check_resource_tag(host, 'group_a')
client.modify_resource_tag(host, port, 'group_a')
check_resource_tag(host, 'group_a')
resource_tag_initialize(host_list, port)
def test_modify_tag_many_times():
"""
{
"title": "test_sys_resource_tag:test_modify_tag_many_times",
"describe": "多次修改be节点标签,修改标签成功",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
host = host_list[0]
for i in range(50):
tag = 'a' + str(i)
client.modify_resource_tag(host, port, tag)
check_resource_tag(host, tag)
resource_tag_initialize(host_list, port)
def test_restart_be_after_modify_tag():
"""
{
"title": "test_sys_resource_tag:test_restart_be_after_modify_tag",
"describe": "修改be节点标签,重启be,be标签状态正确",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
host = host_list[0]
client.modify_resource_tag(host, port, 'group_a')
check_resource_tag(host, 'group_a')
backend_list = client.get_backend_list()
for backend in backend_list:
if palo_job.BackendProcInfo(backend).get_ip() == host:
host_name = backend[palo_job.BackendProcInfo.Host]
assert node_operator.stop_be(host_name)
assert node_operator.start_be(host_name)
time.sleep(30)
check_resource_tag(host, 'group_a')
resource_tag_initialize(host_list, port)
def test_restart_be_after_add_tag():
"""
{
"title": "test_sys_resource_tag:test_restart_be_after_add_tag",
"describe": "drop be节点后,add be节点并添加标签,重启be,be标签状态正确",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
host = host_list[0]
client.drop_backend_list(host_port(host, port))
client.add_backend(host, port, 'group_a')
check_resource_tag(host, 'group_a')
backend_list = client.get_backend_list()
for backend in backend_list:
if palo_job.BackendProcInfo(backend).get_ip() == host:
host_name = backend[palo_job.BackendProcInfo.Host]
assert node_operator.stop_be(host_name)
assert node_operator.start_be(host_name)
time.sleep(30)
check_resource_tag(host, 'group_a')
resource_tag_initialize(host_list, port)
def test_create_table_default_tag():
"""
{
"title": "test_sys_resource_tag:test_create_table_default_tag",
"describe": "按照默认be标签建表,设置正确的副本数,建表成功,副本数量和标签正确,数据导入查询正确",
"tag": "function,p0"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
replication_allocation = "tag.location.default:3"
create(table_name, replication_allocation)
backend_id_list = client.get_replica_backend_id(table_name)
tag_location_list = ['default'] * 3
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
client.clean(database_name)
def test_create_table_default_tag_false():
"""
{
"title": "test_sys_resource_tag:test_create_table_default_tag_false",
"describe": "按照默认be标签建表,设置错误的副本数,建表失败",
"tag": "function,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
replication_allocation = "tag.location.default:10"
util.assert_return(False, 'Failed to find 10 backend(s) for policy', create, table_name, replication_allocation)
replication_allocation = "tag.location.default:-1"
util.assert_return(False, 'Total replication num should between 1 and 32767', create,
table_name, replication_allocation)
replication_allocation = "tag.location.default:"
util.assert_return(False, 'Invalid replication allocation property', create, table_name, replication_allocation)
replication_allocation = "tag.location.default:a"
util.assert_return(False, 'Unexpected exception', create, table_name, replication_allocation)
client.clean(database_name)
def test_tag_location():
"""
{
"title": "test_sys_resource_tag:test_tag_location",
"describe": "修改be节点标签,建表时设置正确的副本标签,建表成功,表副本标签正确,数据导入查询正确",
"tag": "function,p0"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b', 'group_c']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:1,tag.location.group_b:1,tag.location.group_c:1"
create(table_name, replication_allocation)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_tag_location_false():
"""
{
"title": "test_sys_resource_tag:test_tag_location_false",
"describe": "修改be节点标签,建表时设置错误的副本标签,建表失败",
"tag": "function,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_a', 'group_b']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:1,tag.location.group_b:2"
util.assert_return(False, 'Failed to find 2 backend(s) for policy', create, table_name, replication_allocation)
replication_allocation = "tag.location.group_a:1,tag.location.group_b:1,tag.location.group_c:1"
util.assert_return(False, 'Failed to find 1 backend(s) for policy', create, table_name, replication_allocation)
replication_allocation = "tag.location.group_a:1,tag.location.group_b:,"
util.assert_return(False, 'Invalid replication allocation property', create, table_name, replication_allocation)
replication_allocation = "tag.location.group_a:1,tag.location.group_b:a,"
util.assert_return(False, 'Unexpected exception: For input string: "a"', create, table_name, \
replication_allocation)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_create_modify_tag():
"""
{
"title": "test_sys_resource_tag:test_create_modify_tag",
"describe": "修改be节点标签,建表时设置副本标签,建表成功,再修改be标签,数据导入查询正确",
"tag": "function,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
host = host_list[0]
client.modify_resource_tag(host, port, 'group_a')
check_resource_tag(host, 'group_a')
replication_allocation = "tag.location.group_a:1"
#建表,设置副本标签
create(table_name, replication_allocation)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, 'group_a')
check_data(table_name)
#修改BE标签
client.modify_resource_tag(host, port, 'group_b')
check_resource_tag(host, 'group_b')
check_resource_tag_by_id(backend_id_list, 'group_b')
check_data_2(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_replication_num_and_allocation():
"""
{
"title": "test_sys_resource_tag:test_replication_num_and_allocation",
"describe": "修改be节点标签,建表时设置副本标签,同时指定replication_num和replication_allocation,\
仅replication_num参数生效,数据导入查询正确",
"tag": "function,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
host = host_list[0]
client.modify_resource_tag(host, port, 'group_a')
check_resource_tag(host, 'group_a')
replication_allocation = "tag.location.group_a:1"
create(table_name, replication_allocation, '2')
backend_id_list = client.get_replica_backend_id(table_name)
tag_location_list = ['default', 'default']
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_tag_aggregate():
"""
{
"title": "test_sys_resource_tag:test_tag_aggregate",
"describe": "修改be节点标签,创建aggregate表,设置副本标签,建表成功,数据导入查询正确",
"tag": "function,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b', 'group_c']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:1,tag.location.group_b:1,tag.location.group_c:1"
keys_desc = "aggregate key(k1,k2,k3,k4,k5,k6,k10,k11,k7)"
create(table_name, replication_allocation, keys_desc=keys_desc)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_tag_unique():
"""
{
"title": "test_sys_resource_tag:test_tag_unique",
"describe": "修改be节点标签,创建unique表,设置副本标签,建表成功,数据导入查询正确",
"tag": "function,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b', 'group_c']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:1,tag.location.group_b:1,tag.location.group_c:1"
keys_desc = "unique key(k1)"
create(table_name, replication_allocation, column_list=schema.baseall_column_no_agg_list, keys_desc=keys_desc)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_tag_duplicate():
"""
{
"title": "test_sys_resource_tag:test_tag_duplicate",
"describe": "修改be节点标签,创建duplicate表,设置副本标签,建表成功,数据导入查询正确",
"tag": "function,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b', 'group_c']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:1,tag.location.group_b:1,tag.location.group_c:1"
keys_desc = "duplicate key(k1)"
create(table_name, replication_allocation, column_list=schema.baseall_column_no_agg_list, keys_desc=keys_desc)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_tag_range_partition():
"""
{
"title": "test_sys_resource_tag:test_tag_range_partition",
"describe": "修改be节点标签,创建range分区表,设置副本标签,建表成功,数据导入查询正确",
"tag": "function,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b', 'group_c']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
partition_name_list = ['partiiton_a', 'partition_b', 'partition_c']
partition_value_list = ['10', '20', '40']
partition_info = palo_client.PartitionInfo('k1', partition_name_list, partition_value_list)
replication_allocation = "tag.location.group_a:1,tag.location.group_b:1,tag.location.group_c:1"
create(table_name, replication_allocation, partition_info=partition_info)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_tag_list_partition():
"""
{
"title": "test_sys_resource_tag:test_tag_list_partition",
"describe": "修改be节点标签,创建list分区表,设置副本标签,建表成功,数据导入查询正确",
"tag": "function,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b', 'group_c']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
partition_name_list = ['partiiton_a', 'partition_b']
partition_value_list = [('true'), ('false')]
partition_info = palo_client.PartitionInfo('k6', partition_name_list, partition_value_list, \
partition_type='LIST')
replication_allocation = "tag.location.group_a:1,tag.location.group_b:1,tag.location.group_c:1"
create(table_name, replication_allocation, partition_info=partition_info)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_tag_composite_partition():
"""
{
"title": "test_sys_resource_tag:test_tag_composite_partition",
"describe": "修改be节点标签,创建复合分区表,设置副本标签,建表成功,数据导入查询正确",
"tag": "function,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b', 'group_c']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
partition_name_list = ['partiiton_a', 'partition_b', 'partition_c']
partition_value_list = [('10', '1000'), ('20', '10000'), ('30', '32767')]
partition_info = palo_client.PartitionInfo(['k1', 'k2'], partition_name_list, partition_value_list)
replication_allocation = "tag.location.group_a:1,tag.location.group_b:1,tag.location.group_c:1"
create(table_name, replication_allocation, partition_info=partition_info)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_alter_partition_tag():
"""
{
"title": "test_sys_resource_tag:test_alter_partition_tag",
"describe": "修改be节点标签,创建分区表,设置副本标签建表成功,修改分区表的副本配置正确,数据导入查询正确",
"tag": "function,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_a', 'group_b']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
partition_name_list = ['partiiton_a', 'partition_b', 'partition_c']
partition_value_list = ['5', '15', '35']
partition_info = palo_client.PartitionInfo('k1', partition_name_list, partition_value_list)
replication_allocation = "tag.location.group_a:1,tag.location.group_b:1"
create(table_name, replication_allocation, partition_info=partition_info)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, ['group_a', 'group_b'])
check_data(table_name)
client.modify_partition(table_name, 'partition_b', replication_allocation='tag.location.group_a:2')
assert client.get_partition_replica_allocation(table_name, 'partition_b') == 'tag.location.group_a: 2', \
"partition replica allocation false"
check_data_2(table_name)
#副本数量设置错误
util.assert_return(False, 'Failed to find enough host', client.modify_partition, table_name, 'partition_b', \
replication_allocation='tag.location.group_a:3')
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_create_be_restart():
"""
{
"title": "test_sys_resource_tag:test_create_be_restart",
"describe": "修改be节点标签,建表时设置副本标签,建表成功,重启be,数据导入查询正确",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_a', 'group_b']
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:2, tag.location.group_b:1"
create(table_name, replication_allocation)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
#重启BE
backend_list = client.get_backend_list()
for backend in backend_list:
if palo_job.BackendProcInfo(backend).get_ip() in host_list:
host_name = backend[palo_job.BackendProcInfo.Host]
assert node_operator.stop_be(host_name)
assert node_operator.start_be(host_name)
time.sleep(20)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data_2(table_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_replica_balance():
"""
{
"title": "test_sys_resource_tag:test_replica_balance",
"describe": "修改be节点标签,设置副本标签创建30个表,验证表副本均匀分布在各个be",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_a', 'group_a']
be_id_list = []
backend_list = client.get_backend_list()
for backend in backend_list:
if palo_job.BackendProcInfo(backend).get_ip() in host_list[:3]:
be_id = backend[palo_job.BackendProcInfo.BackendId]
if be_id not in be_id_list:
be_id_list.append(be_id)
table_nums = [0, 0, 0]
for i in range(3):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:1"
for i in range(60):
table_name_s = '%s_%d' % (table_name, i)
create(table_name_s, replication_allocation)
backend_id_list = client.get_replica_backend_id(table_name_s)
for i in range(3):
if backend_id_list[0] == be_id_list[i]:
table_nums[i] += 1
for i in range(3):
assert table_nums[i] >= 10 and table_nums[i] <= 30, "replica not balance"
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_rollup_tag():
"""
{
"title": "test_sys_resource_tag:test_tag_aggregate",
"describe": "修改be节点标签,设置副本标签建表,建表成功,创建rollup表,验证rollup表副本标签正确",
"tag": "function,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b']
for i in range(2):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:1, tag.location.group_b:1"
create(table_name, replication_allocation)
backend_id_list = client.get_replica_backend_id(table_name)
check_resource_tag_by_id(backend_id_list, tag_location_list)
check_data(table_name)
#创建rollup表
rollup_name = table_name + '_r'
client.create_rollup_table(table_name, rollup_name, ['k1', 'k2'], is_wait=True)
backend_id_list = client.get_replica_backend_id(table_name)
if backend_id_list[1] == backend_id_list[0]:
assert backend_id_list[2] == backend_id_list[3]
else:
assert backend_id_list[2:].sort() == backend_id_list[:2].sort(), 'rollup table replica false'
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_usr_priv_tag():
"""
{
"title": "test_sys_resource_tag:test_usr_priv_tag",
"describe": "修改be节点标签,建表,创建用户并设置用户资源权限,该用户查询不同标签表,验证资源权限设置正确",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b']
for i in range(2):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation_1 = "tag.location.group_a:1"
create(table_name, replication_allocation_1)
replication_allocation_2 = "tag.location.group_b:1"
check_data(table_name)
table_name_2 = table_name + '_2'
create(table_name_2, replication_allocation_2)
check_data(table_name_2)
#create user
user_name = 'test_resource_tag'
client.clean_user(user_name)
client.create_user(user_name)
privilege_list = ['SELECT_PRIV']
client.grant(user_name, privilege_list, database_name)
kv = "'resource_tags.location'= 'group_a'"
client.set_properties(kv, user=user_name)
client_tag = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name, password='', \
http_port=config.fe_http_port)
#对有权限的表查询验证
client_tag.use(database_name)
sql = "select * from %s order by k1" % table_name
ret1 = client_tag.execute(sql)
ret2 = client.execute('select * from test_query_qa.baseall order by k1')
util.check(ret1, ret2)
#对无权限的表查询验证
sql = "select * from %s order by k1" % table_name_2
util.assert_return(False, 'have no queryable replicas', client_tag.execute, sql)
client.clean_user(user_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_user_set_property():
"""
{
"title": "test_sys_resource_tag:test_usr_set_property",
"describe": "修改be节点标签,创建两个普通用户,使用其中一个用户对另一个设置用户资源权限失败",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
client.modify_resource_tag(host_list[0], port, 'group_a')
check_resource_tag(host_list[0], 'group_a')
user_name = 'test_resource_tag'
client.clean_user(user_name)
client.create_user(user_name)
user_name_set = 'test_resource_tag_set'
client.clean_user(user_name_set)
client.create_user(user_name_set)
client_tag = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name_set, password='', \
http_port=config.fe_http_port)
kv = "'resource_tags.location'= 'group_a'"
util.assert_return(False, 'Access denied', client_tag.set_properties, kv, user=user_name)
client.clean_user(user_name)
client.clean_user(user_name_set)
resource_tag_initialize(host_list, port)
def test_admin_user_set_property():
"""
{
"title": "test_sys_resource_tag:test_admin_user_set_property",
"describe": "修改be节点标签,建表,创建普通用户和admin用户,使用admin用户对普通用户设置资源权限,验证权限设置正确",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
#设置副本标签,建表
tag_location_list = ['group_a', 'group_b']
for i in range(2):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation_1 = "tag.location.group_a:1"
create(table_name, replication_allocation_1)
replication_allocation_2 = "tag.location.group_b:1"
check_data(table_name)
table_name_2 = table_name + '_2'
create(table_name_2, replication_allocation_2)
check_data(table_name_2)
#设置admin用户和添加权限用户
user_name = 'test_resource_tag'
client.clean_user(user_name)
client.create_user(user_name)
privilege_list = ['SELECT_PRIV']
client.grant(user_name, privilege_list, database_name)
user_name_set = 'test_resource_tag_set'
client.clean_user(user_name_set)
client.create_user(user_name_set)
privilege_list = ['ADMIN_PRIV']
client.grant(user_name_set, privilege_list, database='*', table='*', catalog='*')
#添加用户资源权限
client_tag_set = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name_set, password='', \
http_port=config.fe_http_port)
kv = "'resource_tags.location'= 'group_a'"
client_tag_set.set_properties(kv, user=user_name)
#对有权限的表查询验证
client_tag = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name, password='', \
http_port=config.fe_http_port)
client_tag.use(database_name)
sql = "select * from %s order by k1" % table_name
ret1 = client_tag.execute(sql)
ret2 = client.execute('select * from test_query_qa.baseall order by k1')
util.check(ret1, ret2)
#对无权限的表查询验证
sql = "select * from %s order by k1" % table_name_2
util.assert_return(False, 'have no queryable replicas', client_tag.execute, sql)
#清理用户和测试数据库,标签初始化
client.clean_user(user_name)
client.clean_user(user_name_set)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_set_property_for_root():
"""
{
"title": "test_sys_resource_tag:test_set_property_for_root",
"describe": "修改be节点标签,建表,创建admin用户,使用admin用户对root用户设置资源权限,验证资源权限设置正确",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
#设置副本标签,建表
tag_location_list = ['group_a', 'group_b']
for i in range(2):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation_1 = "tag.location.group_a:1"
create(table_name, replication_allocation_1)
replication_allocation_2 = "tag.location.group_b:1"
check_data(table_name)
table_name_2 = table_name + '_2'
create(table_name_2, replication_allocation_2)
check_data(table_name_2)
#设置admin用户
user_name_set = 'test_resource_tag_set'
client.clean_user(user_name_set)
client.create_user(user_name_set)
privilege_list = ['ADMIN_PRIV']
client.grant(user_name_set, privilege_list, database='*', table='*', catalog='*')
#添加root用户资源权限
client_tag_set = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name_set, \
password='', http_port=config.fe_http_port)
kv = "'resource_tags.location'= 'group_a'"
client_tag_set.set_properties(kv, user='root')
#对有权限的表查询验证
client_root = palo_client.get_client(config.fe_host, config.fe_query_port, user='root', \
password=config.fe_password, http_port=config.fe_http_port)
client_root.use(database_name)
sql = "select * from %s order by k1" % table_name
ret1 = client_root.execute(sql)
ret2 = client.execute('select * from test_query_qa.baseall order by k1')
util.check(ret1, ret2)
#对无权限的表查询验证
sql = "select * from %s order by k1" % table_name_2
util.assert_return(False, 'have no queryable replicas', client_root.execute, sql)
#清理用户和测试数据库,标签初始化
client.clean_user(user_name_set)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_set_property_for_role():
"""
{
"title": "test_sys_resource_tag:test_usr_priv_tag",
"describe": "修改be节点标签,建表,创建role并设置用户资源权限,设置用户资源权限失败",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
client.modify_resource_tag(host_list[0], port, 'group_a')
check_resource_tag(host_list[0], 'group_a')
role_name = 'test_resource_tag_role'
try:
client.drop_role(role_name)
except:
pass
client.create_role(role_name)
kv = "'resource_tags.location'= 'group_a'"
util.assert_return(False, 'Unknown user', client.set_properties, kv, role_name)
client.drop_role(role_name)
resource_tag_initialize(host_list, port)
def test_set_property_for_role_user():
"""
{
"title": "test_sys_resource_tag:test_set_property_for_role_user",
"describe": "修改be节点标签,建表,创建用户并指定role,设置用户资源权限,验证资源权限设置正确",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b']
for i in range(2):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation_1 = "tag.location.group_a:1"
create(table_name, replication_allocation_1)
replication_allocation_2 = "tag.location.group_b:1"
check_data(table_name)
table_name_2 = table_name + '_2'
create(table_name_2, replication_allocation_2)
check_data(table_name_2)
#create role
role_name = 'test_resource_tag_role'
try:
client.drop_role(role_name)
except:
pass
client.create_role(role_name)
privilege_list = ['SELECT_PRIV']
client.grant(role_name, privilege_list, database_name, is_role=True)
#create user
user_name = 'test_resource_tag'
client.clean_user(user_name)
client.create_user(user_name, default_role=role_name)
kv = "'resource_tags.location'= 'group_a'"
client.set_properties(kv, user_name)
client_tag = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name, password='', \
http_port=config.fe_http_port)
#对有权限的表查询验证
client_tag.use(database_name)
sql = "select * from %s order by k1" % table_name
ret1 = client_tag.execute(sql)
ret2 = client.execute('select * from test_query_qa.baseall order by k1')
util.check(ret1, ret2)
#对无权限的表查询验证
sql = "select * from %s order by k1" % table_name_2
util.assert_return(False, 'have no queryable replicas', client_tag.execute, sql)
client.clean_user(user_name)
client.drop_role(role_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_set_property_empty():
"""
{
"title": "test_sys_resource_tag:test_set_property_empty",
"describe": "修改be节点标签,建表,创建用户并设置用户资源权限为空值,验证用户具有所有资源权限",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b']
for i in range(2):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation_1 = "tag.location.group_a:1"
create(table_name, replication_allocation_1)
replication_allocation_2 = "tag.location.group_b:1"
check_data(table_name)
table_name_2 = table_name + '_2'
create(table_name_2, replication_allocation_2)
check_data(table_name_2)
#create user
user_name = 'test_resource_tag'
client.clean_user(user_name)
client.create_user(user_name)
privilege_list = ['SELECT_PRIV']
client.grant(user_name, privilege_list, database_name)
kv = "'resource_tags.location'= ''"
client.set_properties(kv, user_name)
client_tag = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name, password='', \
http_port=config.fe_http_port)
#用户具有所有标签表的权限
client_tag.use(database_name)
sql = "select * from %s order by k1" % table_name
ret1 = client_tag.execute(sql)
ret2 = client.execute('select * from test_query_qa.baseall order by k1')
util.check(ret1, ret2)
sql = "select * from %s order by k1" % table_name_2
ret1 = client_tag.execute(sql)
util.check(ret1, ret2)
client.clean_user(user_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_set_property_and_alter_table_tag():
"""
{
"title": "test_sys_resource_tag:test_set_property_and_alter_table_tag",
"describe": "修改be节点标签,建表,创建用户并设置用户资源权限,修改表的副本标签使用户对表有权限,验证权限正确",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b']
for i in range(2):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:1"
create(table_name, replication_allocation)
check_data(table_name)
#create user
user_name = 'test_resource_tag'
client.clean_user(user_name)
client.create_user(user_name)
privilege_list = ['SELECT_PRIV']
client.grant(user_name, privilege_list, database_name)
kv = "'resource_tags.location'= 'group_b'"
client.set_properties(kv, user_name)
client_tag = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name, password='', \
http_port=config.fe_http_port)
#用户对该表无权限
client_tag.use(database_name)
sql = "select * from %s order by k1" % table_name
util.assert_return(False, 'have no queryable replicas', client_tag.execute, sql)
#修改表的副本标签
backend_id = client.get_replica_backend_id(table_name)
client.schema_change(table_name, replication_allocation="tag.location.group_b:1")
ret = wait_replica_transfer(table_name, backend_id)
assert ret == True, 'REPLICA TRANSFER FAIL'
#用户具有该表权限
client_tag.use(database_name)
ret1 = client_tag.execute(sql)
ret2 = client.execute('select * from test_query_qa.baseall order by k1')
util.check(ret1, ret2)
client.clean_user(user_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_set_property_and_alter_table_tag_no_property():
"""
{
"title": "test_sys_resource_tag:test_set_property_and_alter_table_tag_no_property",
"describe": "修改be节点标签,建表,创建用户并设置用户资源权限,修改表副本标签使用户对表无权限,验证权限设置正确",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b']
for i in range(2):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation = "tag.location.group_a:1"
create(table_name, replication_allocation)
check_data(table_name)
#create user
user_name = 'test_resource_tag'
client.clean_user(user_name)
client.create_user(user_name)
privilege_list = ['SELECT_PRIV']
client.grant(user_name, privilege_list, database_name)
kv = "'resource_tags.location'= 'group_a'"
client.set_properties(kv, user_name)
client_tag = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name, password='', \
http_port=config.fe_http_port)
#用户对该表有权限
client_tag.use(database_name)
sql = "select * from %s order by k1" % table_name
ret1 = client_tag.execute(sql)
ret2 = client.execute('select * from test_query_qa.baseall order by k1')
util.check(ret1, ret2)
#修改表的副本标签
backend_id = client.get_replica_backend_id(table_name)
client.schema_change(table_name, replication_allocation="tag.location.group_b:1")
ret = wait_replica_transfer(table_name, backend_id)
assert ret == True, 'REPLICA TRANSFER FAIL'
check_data_2(table_name)
#bug,用户仍具有该表权限
client_tag.use(database_name)
ret1 = client_tag.execute(sql)
ret2 = client.execute('select * from (select * from test_query_qa.baseall union select k1+15 as \
k1,k2,k3,k4,k5,k6,k10,k11,k7,k8,k9 from test_query_qa.baseall)a order by k1')
util.check(ret1, ret2)
#util.assert_return(False, 'have no queryable replicas', client_tag.execute, sql)
client.clean_user(user_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_alter_user_property():
"""
{
"title": "test_sys_resource_tag:test_alter_user_property",
"describe": "修改be节点标签,建表,创建用户并设置用户资源权限,修改该用户资源权限,验证资源权限修改正确",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client.clean(database_name)
client.create_database(database_name)
client.use(database_name)
tag_location_list = ['group_a', 'group_b']
for i in range(2):
client.modify_resource_tag(host_list[i], port, tag_location_list[i])
check_resource_tag(host_list[i], tag_location_list[i])
replication_allocation_1 = "tag.location.group_a:1"
create(table_name, replication_allocation_1)
replication_allocation_2 = "tag.location.group_b:1"
check_data(table_name)
table_name_2 = table_name + '_2'
create(table_name_2, replication_allocation_2)
check_data(table_name_2)
#create user
user_name = 'test_resource_tag'
client.clean_user(user_name)
client.create_user(user_name)
privilege_list = ['SELECT_PRIV']
client.grant(user_name, privilege_list, database_name)
kv = "'resource_tags.location'= 'group_a'"
client.set_properties(kv, user_name)
client_tag = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name, password='', \
http_port=config.fe_http_port)
#验证
client_tag.use(database_name)
sql = "select * from %s order by k1" % table_name
ret1 = client_tag.execute(sql)
ret2 = client.execute('select * from test_query_qa.baseall order by k1')
util.check(ret1, ret2)
sql = "select * from %s order by k1" % table_name_2
util.assert_return(False, 'have no queryable replicas', client_tag.execute, sql)
#修改user权限
kv = "'resource_tags.location'= 'group_b'"
client.set_properties(kv, user_name)
client_tag = palo_client.get_client(config.fe_host, config.fe_query_port, user=user_name, password='', \
http_port=config.fe_http_port)
#验证
client_tag.use(database_name)
sql = "select * from %s order by k1" % table_name_2
ret1 = client_tag.execute(sql)
util.check(ret1, ret2)
sql = "select * from %s order by k1" % table_name
util.assert_return(False, 'have no queryable replicas', client_tag.execute, sql)
client.clean_user(user_name)
client.clean(database_name)
resource_tag_initialize(host_list, port)
def test_set_exec_mem_limit():
"""
{
"title": "test_sys_resource_tag:test_set_exec_mem_limit",
"describe": "设置exec_mem_limit,验证设置正确,对查询语句限制有效,仅在该连接内生效,仅支持正值",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
prefix = 'exec_mem_limit'
client.set_variables(prefix, '1G')
assert client.show_variables(prefix)[0][1] == '1073741824'
client.set_variables(prefix, '1234567890')
assert client.show_variables(prefix)[0][1] == '1234567890'
client_new = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user,
password=config.fe_password, http_port=config.fe_http_port)
print(client_new.show_variables(prefix)[0][1])
assert client_new.show_variables(prefix)[0][1] == '2147483648'
util.assert_return(False, 'invalid data volumn expression', client.set_variables, prefix, '-2147483648')
util.assert_return(False, 'Data volumn must larger than 0', client.set_variables, prefix, '0')
util.assert_return(False, 'invalid data volumn expression', client.set_variables, prefix, 'a')
client.set_variables(prefix, '65536')
sql = 'select * from test_query_qa.test order by k1 limit 1'
util.assert_return(False, 'Memory limit exceeded', client.execute, sql)
# util.assert_return(False, 'failed mem consume', client.execute, sql)
client.set_variables(prefix, '2G')
def test_set_global_exec_mem_limit():
"""
{
"title": "test_sys_resource_tag:test_set_global_exec_mem_limit",
"describe": "设置golbal exec_mem_limit,验证设置正确,对查询语句限制有效,全局生效,仅支持正值",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
prefix = 'exec_mem_limit'
client.set_variables(prefix, '1G', is_global=True)
client_new = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user,
password=config.fe_password, http_port=config.fe_http_port)
assert client_new.show_variables(prefix)[0][1] == '1073741824'
util.assert_return(False, 'invalid data volumn expression', client_new.set_variables, prefix, '-2147483648', \
is_global=True)
client.set_variables(prefix, '65536', is_global=True)
sql = 'select * from test_query_qa.test order by k1 limit 1'
client_new = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user,
password=config.fe_password, http_port=config.fe_http_port)
util.assert_return(False, 'Memory limit exceeded', client_new.execute, sql)
# util.assert_return(False, 'failed mem consume', client_new.execute, sql)
client.set_variables(prefix, '2G', is_global=True)
def test_set_sql_exec_mem_limit():
"""
{
"title": "test_sys_resource_tag:test_set_sql_exec_mem_limit",
"describe": "查询语句中设置exec_mem_limit,验证设置正确,对查询语句限制有效,仅对该语句有效,仅支持正值",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
prefix = 'exec_mem_limit'
client.set_variables(prefix, '2G')
sql_1 = 'select /*+ SET_VAR(exec_mem_limit=1073741824) */ * from test_query_qa.test order by k1 limit 1'
sql_2 = 'select * from test_query_qa.test order by k1 limit 1'
common.check2(client, sql_1, sql2=sql_2)
sql = 'select /*+ SET_VAR(exec_mem_limit=65536) */ * from test_query_qa.test order by k1 limit 1'
util.assert_return(False, 'Memory limit exceeded', client.execute, sql)
sql = 'select /*+ SET_VAR(exec_mem_limit=-2147483648) */ * from test_query_qa.test order by k1 limit 1'
util.assert_return(False, 'Syntax error', client.execute, sql)
client.set_variables(prefix, '2G')
def test_set_cpu_resource_limit():
"""
{
"title": "test_sys_resource_tag:test_set_cpu_resource_limit",
"describe": "设置cpu_resource_limit,验证设置正确,仅支持正值和默认值-1",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
prefix = 'cpu_resource_limit'
client.set_variables(prefix, '2')
assert client.show_variables(prefix)[0][1] == '2'
client.set_variables(prefix, '100')
assert client.show_variables(prefix)[0][1] == '100'
client.set_variables(prefix, '-1')
assert client.show_variables(prefix)[0][1] == '-1'
#bug待修复,参数不能设置为-1以外的负值
util.assert_return(True, '', client.set_variables, prefix, '-100')
#util.assert_return(False, 'Data volumn must larger than 0', client.set_variables, prefix, '-100')
util.assert_return(False, 'Incorrect argument type', client.set_variables, prefix, 'a')
def test_set_cpu_resource_limit_user():
"""
{
"title": "test_sys_resource_tag:test_set_cpu_resource_limit_user",
"describe": "创建用户,对用户设置cpu_resource_limit,验证设置正确,对该用户有效,仅支持正值和默认值-1",
"tag": "system,p1,fuzz"
}
"""
resource_tag_initialize(host_list, port)
prefix = 'cpu_resource_limit'
client.set_variables(prefix, '-1')
assert client.show_variables(prefix)[0][1] == '-1'
user_name = 'test_set_cpu_resource_limit'
client.clean_user(user_name)
client.create_user(user_name)
kv = "'cpu_resource_limit' = '2'"
client.set_properties(kv, user_name)
assert client.show_property('cpu_resource_limit', user_name)[0][1] == '2', 'set user cpu_resource_limit false'
kv = "'cpu_resource_limit' = '-1'"
client.set_properties(kv, user_name)
assert client.show_property('cpu_resource_limit', user_name)[0][1] == '-1', 'set user cpu_resource_limit false'
kv = "'cpu_resource_limit' = 'a'"
util.assert_return(False, "cpu_resource_limit is not number", client.set_properties, kv, user_name)
kv = "'cpu_resource_limit' = '-100'"
util.assert_return(False, "cpu_resource_limit is not valid", client.set_properties, kv, user_name)
def test_sql_exec_mem_limit():
"""
{
"title": "test_sys_resource_tag:test_sql_exec_mem_limit",
"describe": "查询语句设置exec_mem_limit的语法测试,验证支持各种select语法和窗口函数",
"tag": "system,p1"
}
"""
resource_tag_initialize(host_list, port)
prefix = 'exec_mem_limit'
client.set_variables(prefix, '2G')
sql_1 = 'select /*+ SET_VAR(exec_mem_limit=1073741824) */ k1,k2,k3,k7 from test_query_qa.test order by k2,k3 \
limit 100'
sql_2 = 'select k1,k2,k3,k7 from test_query_qa.test order by k2,k3 limit 100'
util.check(client.execute(sql_1), client.execute(sql_2))
sql_1 = 'select /*+ SET_VAR(exec_mem_limit=1073741824) */ * from test_query_qa.test where k3 > 0 and k2 < 0 \
order by k2,k3,k4 desc'
sql_2 = 'select * from test_query_qa.test where k3 > 0 and k2 < 0 order by k2,k3,k4 desc'
util.check(client.execute(sql_1), client.execute(sql_2))
sql_1 = 'select /*+ SET_VAR(exec_mem_limit=1073741824) */ count(*), max(k3) from test_query_qa.test group by k1 \
order by k1'
sql_2 = 'select count(*), max(k3) from test_query_qa.test group by k1 order by k1'
util.check(client.execute(sql_1), client.execute(sql_2))
sql_1 = 'select /*+ SET_VAR(exec_mem_limit=1073741824) */ * from test_query_qa.test t left join \
test_query_qa.baseall b on t.k1 = b.k1 order by t.k2,t.k3,t.k4 desc'
sql_2 = 'select * from test_query_qa.test t left join test_query_qa.baseall b on t.k1 = b.k1 \
order by t.k2,t.k3,t.k4 desc'
util.check(client.execute(sql_1), client.execute(sql_2))
sql_1 = 'select /*+ SET_VAR(exec_mem_limit=1073741824) */ a.k1,a.k2 from (select * from test_query_qa.test \
order by k2,k3) a order by a.k1,a.k2'
sql_2 = 'select a.k1,a.k2 from (select * from test_query_qa.test order by k2,k3) a order by a.k1,a.k2'
util.check(client.execute(sql_1), client.execute(sql_2))
def teardown_module():
"""
tearDown
"""
resource_tag_initialize(host_list, port)