blob: 187d7cc50b08200ca23fa5b4ce28ca4d0774c593 [file] [log] [blame]
#/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
############################################################################
#
# @file test_sys_materialized_view_2.py
# @date 2020-08-03 14:36:10
# 比物化视图一期增加count,hll_union, bitmap_union三种function
#############################################################################
"""
import sys
import time
from data import schema as DATA
from data import load_file as FILE
sys.path.append("../")
from lib import palo_config
from lib import palo_client
from lib import util
from lib import common
from lib.palo_job import DescInfo
from lib.palo_job import RollupJob
from lib.palo_job import LoadJob
config = palo_config.config
LOG = palo_client.LOG
L = palo_client.L
broker_info = palo_config.broker_info
check_db = 'mv_check_db'
check_agg_tb = 'mv_check_agg_tb'
def setup_module():
"""setup"""
global check_db, check_dup_tb, check_agg_tb
try:
client = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user, \
password=config.fe_password)
ret = client.execute('select * from %s.%s' % (check_db, check_agg_tb))
assert len(ret) == 15, 'need to init check db'
except Exception as e:
client = common.create_workspace(check_db)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
ret = client.create_table(check_agg_tb, DATA.datatype_column_list, distribution_info=distribution_info)
assert ret, 'create table failed'
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101', 'k13=hll_hash(v2)', 'k14=to_bitmap(v1)']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, check_agg_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'load data failed'
def teardown_module():
"""teardown"""
pass
def test_create_agg_mv():
"""
{
"title": "test_create_agg_mv",
"describe": "测试创建agg表的物化视图,验证创建成功,查询命中,结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
agg_tb = table_name + '_agg'
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
ret = client.create_table(agg_tb, DATA.datatype_column_list, distribution_info=distribution_info)
assert ret, 'create table failed'
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10',
'k11=v11', 'k12=v12', 'k0=v7', 'k5=v4*101', 'k13=hll_hash(v2)', 'k14=to_bitmap(abs(v2))']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, agg_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'broker load failed'
# 创建物化视图
sql = 'select k6, k7 from %s group by k6,k7' % agg_tb
assert client.create_materialized_view(agg_tb, 'mv1', sql, is_wait=True), 'create mv failed'
assert client.get_index(agg_tb, 'mv1'), 'get mv1 failed'
ret = client.get_index_schema(agg_tb, 'mv1')
assert 'DECIMALV3(9,3)' == util.get_attr_condition_value(ret, DescInfo.Field, 'k6', DescInfo.Type), \
'k6 type expect DECIMAL(9,3)'
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
rollup = common.get_explain_rollup(client, sql)
if rollup == ['mv1']:
break
assert rollup == ['mv1'], 'use wrong rollup %s, expect mv1' % rollup
sql = 'select k4, k2, max(k11), sum(k12), hll_union(k13), bitmap_union(k14) from %s group by k4, k2' % agg_tb
assert client.create_materialized_view(agg_tb, 'mv2', sql, is_wait=True), 'create mv2 failed'
assert client.get_index(agg_tb, 'mv2'), 'get mv2 failed'
ret = client.get_index_schema(agg_tb, 'mv2')
assert 'DOUBLE' == util.get_attr_condition_value(ret, DescInfo.Field, 'k12', DescInfo.Type)
assert 'HLL' == util.get_attr_condition_value(ret, DescInfo.Field, 'k13', DescInfo.Type)
assert 'BITMAP' == util.get_attr_condition_value(ret, DescInfo.Field, 'k14', DescInfo.Type)
sql = 'select k4, max(k11), sum(k12), hll_union_agg(k13), bitmap_union(k14) from %s ' \
'group by k4, k2 order by k4, k2' % agg_tb
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
rollup = common.get_explain_rollup(client, sql)
if rollup == ['mv2']:
break
assert rollup == ['mv2'], 'use wrong rollup %s, expect mv2' % rollup
sql2 = 'select k4, max(k11), sum(k12), hll_union_agg(k13), bitmap_union(k14) from %s.%s ' \
'group by k4, k2 order by k4, k2' % (check_db, check_agg_tb)
common.check2(client, sql1=sql, sql2=sql2)
client.clean(database_name)
def test_create_dup_max_mv():
"""
{
"title": "test_create_dup_max_mv",
"describe": "测试创建dup表的物化视图,各种类型的max聚合类型,验证创建成功,查询命中物化视图,结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
dup_tb = table_name + '_dup'
distribution_info = palo_client.DistributionInfo('HASH(k0)', 2)
ret = client.create_table(dup_tb, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert client.show_tables(dup_tb), 'get table failed'
assert ret, 'create table failed'
column_fields = util.get_attr(DATA.datatype_column_no_agg_list, 0)
column_type = util.get_attr(DATA.datatype_column_no_agg_list, 1)
# load data
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10',
'k11=v11', 'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, dup_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'batch load failed'
# 创建物化视图 & 验证schema
sql = 'select k0, max(k1), max(k2), max(k3), max(k4), max(k5), max(k6), max(k7), max(k8), max(k9), ' \
'max(k10), max(k11), max(k12) from %s group by k0' % dup_tb
max_mv = 'max_mv'
assert client.create_materialized_view(dup_tb, max_mv, sql, is_wait=True), 'create mv failed'
assert client.get_index(dup_tb, max_mv), 'get mv failed'
ret = client.get_index_schema(dup_tb, max_mv)
assert column_fields == util.get_attr(ret, DescInfo.Field), 'check field failed, expect: %s' % column_fields
assert column_type == util.get_attr(ret, DescInfo.Type), 'check feild type failed, expect: %s' % column_type
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql)
if max_mv in mv_list:
break
assert max_mv in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (max_mv, mv_list)
sql2 = 'select k0, max(k1), max(k2), max(k3), max(k4), max(k5), max(k6), max(k7), max(k8), max(k9), ' \
'max(k10), max(k11), max(k12) from %s.%s group by k0' % (check_db, check_agg_tb)
common.check2(client, sql, sql2=sql2, forced=True)
client.clean(database_name)
def test_create_dup_min_mv():
"""
{
"title": "test_create_agg_mv",
"describe": "测试创建dup表的物化视图,各种类型的min聚合类型,验证创建成功,查询命中物化视图,结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
dup_tb = table_name + '_dup'
distribution_info = palo_client.DistributionInfo('HASH(k0)', 2)
ret = client.create_table(dup_tb, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert client.show_tables(dup_tb), 'get table failed'
assert ret, 'create table failed'
column_fields = util.get_attr(DATA.datatype_column_no_agg_list, 0)
column_type = util.get_attr(DATA.datatype_column_no_agg_list, 1)
# load data
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10',
'k11=v11', 'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, dup_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'load data failed'
sql = 'select k0, min(k1), min(k2), min(k3), min(k4), min(k5), min(k6), min(k7), min(k8), min(k9), ' \
'min(k10), min(k11), min(k12) from %s group by k0' % dup_tb
min_mv = 'min_mv'
assert client.create_materialized_view(dup_tb, min_mv, sql, is_wait=True), 'create mv failed'
assert client.get_index(dup_tb, min_mv), 'get mv failed'
ret = client.get_index_schema(dup_tb, min_mv)
assert column_fields == util.get_attr(ret, DescInfo.Field), 'check field failed, expect %s' % column_fields
assert column_type == util.get_attr(ret, DescInfo.Type), 'check field type failed, expect %s' % column_type
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql)
if min_mv in mv_list:
break
assert min_mv in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (min_mv, mv_list)
sql2 = 'select k0, min(k1), min(k2), min(k3), min(k4), min(k5), min(k6), min(k7), min(k8), min(k9), ' \
'min(k10), min(k11), min(k12) from %s.%s group by k0' % (check_db, check_agg_tb)
common.check2(client, sql, sql2=sql2, forced=True)
client.clean(database_name)
def test_create_dup_sum_mv():
"""
{
"title": "test_create_agg_mv",
"describe": "测试创建dup表的物化视图,各种类型的sum聚合类型,验证创建成功,查询命中物化视图,结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
dup_tb = table_name + '_dup'
distribution_info = palo_client.DistributionInfo('HASH(k0)', 10)
ret = client.create_table(dup_tb, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert client.show_tables(dup_tb), 'get table failed'
assert ret, 'create table failed'
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10',
'k11=v11', 'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, dup_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'load data failed'
sql = 'select k0, sum(k1), sum(k2), sum(k3), sum(k4), sum(k5), sum(k6), sum(k11), sum(k12) from %s ' \
'group by k0' % dup_tb
sum_mv = 'sum_mv'
assert client.create_materialized_view(dup_tb, sum_mv, sql, is_wait=True), 'create mv failed'
assert client.get_index(dup_tb, sum_mv), 'get index failed'
ret = client.get_index_schema(dup_tb, sum_mv)
column_fields = [u'k0', u'k1', u'k2', u'k3', u'k4', u'k5', u'k6', u'k11', u'k12']
column_type = [u'BOOLEAN', u'BIGINT', u'BIGINT', u'BIGINT', u'BIGINT', u'LARGEINT',
u'DECIMAL(9,3)', u'DOUBLE', u'DOUBLE']
assert column_fields == util.get_attr(ret, DescInfo.Field), 'check field failed, expect: %s' % column_fields
assert column_type == util.get_attr(ret, DescInfo.Type), 'check field type failed, expect: %s' % column_type
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql)
if sum_mv in mv_list:
break
assert sum_mv in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (sum_mv, mv_list)
# TODO: 结果校验不正确,存在溢出问题,暂不修复
sql2 = 'select k0, sum(k1), sum(k2), sum(k3), sum(k4), sum(k5), sum(k6), sum(k11), sum(k12) from %s.%s ' \
'group by k0' % (check_db, check_agg_tb)
# common.check2(client, sql, sql2=sql2, forced=True)
client.clean(database_name)
def test_create_dup_count_mv():
"""
{
"title": "test_create_agg_mv",
"describe": "测试创建dup表的物化视图,各种类型的count聚合类型,验证创建成功,查询命中物化视图,结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
dup_tb = table_name + '_dup'
distribution_info = palo_client.DistributionInfo('HASH(k0)', 10)
ret = client.create_table(dup_tb, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert client.show_tables(dup_tb), 'get table failed'
assert ret, 'create table failed'
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10',
'k11=v11', 'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, dup_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'load data failed'
sql = 'select k0, count(k1), count(k2), count(k3), count(k4), count(k5), count(k6), count(k7), ' \
'count(k8), count(k9), count(k10), count(k11), count(k12) from %s group by k0' % dup_tb
count_mv = 'count_mv'
assert client.create_materialized_view(dup_tb, count_mv, sql, is_wait=True), 'create mv failed'
assert client.get_index(dup_tb, count_mv), 'get mv failed'
ret = client.get_index_schema(dup_tb, count_mv)
column_fields = [u'k0', u'CASE WHEN k1 IS NULL THEN 0 ELSE 1 END', u'CASE WHEN k2 IS NULL THEN 0 ELSE 1 END',
u'CASE WHEN k3 IS NULL THEN 0 ELSE 1 END', u'CASE WHEN k4 IS NULL THEN 0 ELSE 1 END',
u'CASE WHEN k5 IS NULL THEN 0 ELSE 1 END', u'CASE WHEN k6 IS NULL THEN 0 ELSE 1 END',
u'CASE WHEN k7 IS NULL THEN 0 ELSE 1 END', u'CASE WHEN k8 IS NULL THEN 0 ELSE 1 END',
u'CASE WHEN k9 IS NULL THEN 0 ELSE 1 END', u'CASE WHEN k10 IS NULL THEN 0 ELSE 1 END',
u'CASE WHEN k11 IS NULL THEN 0 ELSE 1 END', u'CASE WHEN k12 IS NULL THEN 0 ELSE 1 END']
column_types = [u'BOOLEAN', u'BIGINT', u'BIGINT', u'BIGINT', u'BIGINT', u'BIGINT', u'BIGINT',
u'BIGINT', u'BIGINT', u'BIGINT', u'BIGINT', u'BIGINT', u'BIGINT']
real_fields = [x.replace('`', '') for x in util.get_attr(ret, DescInfo.Field)]
assert column_fields == real_fields, 'check mv field failed, expect:%s, actural: %s' \
% (column_fields, real_fields)
assert column_types == util.get_attr(ret, DescInfo.Type), 'check mv field type failed, expect: %s' % column_types
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql)
if count_mv in mv_list:
break
assert count_mv in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (count_mv, mv_list)
sql2 = 'select k0, count(k1), count(k2), count(k3), count(k4), count(k5), count(k6), count(k7), ' \
'count(k8), count(k9), count(k10), count(k11), count(k12) from %s.%s group by k0' % (check_db, check_agg_tb)
common.check2(client, sql, sql2=sql2, forced=True)
client.clean(database_name)
def test_create_dup_hll_mv():
"""
{
"title": "test_create_agg_mv",
"describe": "测试创建dup表的物化视图,各种类型的hll_union聚合类型,验证创建成功,查询命中物化视图,结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create db
dup_tb = table_name + '_dup'
distribution_info = palo_client.DistributionInfo('HASH(k0)', 10)
ret = client.create_table(dup_tb, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert client.show_tables(dup_tb), 'get table failed'
assert ret, 'create table failed'
# load data
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10',
'k11=v11', 'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, dup_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'load data failed'
# create mv & check
sql = 'select k0, hll_union(hll_hash(k1)), hll_union(hll_hash(k2)), hll_union(hll_hash(k3)),' \
' hll_union(hll_hash(k4)), hll_union(hll_hash(k5)), ' \
'hll_union(hll_hash(k7)), hll_union(hll_hash(k8)), hll_union(hll_hash(k9)), ' \
'hll_union(hll_hash(k10)), hll_union(hll_hash(k11)), hll_union(hll_hash(k12)) ' \
'from %s group by k0' % dup_tb
hll_mv = 'hll_mv'
assert client.create_materialized_view(dup_tb, hll_mv, sql, is_wait=True), 'create mv failed'
assert client.get_index(dup_tb, hll_mv), 'get mv failed'
ret = client.get_index_schema(dup_tb, hll_mv)
column_fields = ['k0', 'hll_hash(k1)', 'hll_hash(k2)', 'hll_hash(k3)', 'hll_hash(k4)', 'hll_hash(k5)',
'hll_hash(k7)', 'hll_hash(k8)', 'hll_hash(k9)', 'hll_hash(k10)', 'hll_hash(k11)',
'hll_hash(k12)']
column_type = ['BOOLEAN', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL']
real_fields = [x.replace('`', '') for x in util.get_attr(ret, DescInfo.Field)]
assert column_fields == real_fields, 'check mv field failed, expect %s' % column_fields
assert column_type == util.get_attr(ret, DescInfo.Type), 'check mv field type failed, expect %s' % column_type
sql = 'select k0, hll_union_agg(hll_hash(k1)), hll_union_agg(hll_hash(k2)), hll_union_agg(hll_hash(k3)),' \
' hll_union_agg(hll_hash(k4)), hll_union_agg(hll_hash(k5)), ' \
'hll_union_agg(hll_hash(k7)), hll_union_agg(hll_hash(k8)), hll_union_agg(hll_hash(k9)), ' \
'hll_union_agg(hll_hash(k10)), hll_union_agg(hll_hash(k11)), hll_union_agg(hll_hash(k12)) ' \
'from %s.%s group by k0'
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql % (database_name, dup_tb))
if hll_mv in mv_list:
break
assert hll_mv in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (hll_mv, mv_list)
common.check2(client, sql % (database_name, dup_tb), sql2=sql % (check_db, check_agg_tb), forced=True)
client.clean(database_name)
def test_create_dup_hll_mv_1():
"""
{
"title": "test_create_dup_hll_mv_1",
"describe": "测试创建dup表的物化视图,各种类型的hll_union聚合类型,验证创建成功,查询命中物化视图,结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
dup_tb = table_name + '_dup'
distribution_info = palo_client.DistributionInfo('HASH(k0)', 10)
ret = client.create_table(dup_tb, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert client.show_tables(dup_tb), 'get table failed'
assert ret, 'create table failed'
# create mv
sql = 'select k0, hll_union(hll_hash(k1)), hll_union(hll_hash(k2)), hll_union(hll_hash(k3)),' \
' hll_union(hll_hash(k4)), hll_union(hll_hash(k5)), ' \
'hll_union(hll_hash(k7)), hll_union(hll_hash(k8)), hll_union(hll_hash(k9)), ' \
'hll_union(hll_hash(k10)), hll_union(hll_hash(k11)), hll_union(hll_hash(k12)) ' \
'from %s group by k0' % dup_tb
hll_mv = 'hll_mv'
assert client.create_materialized_view(dup_tb, hll_mv, sql, is_wait=True), 'create mv failed'
assert client.get_index(dup_tb, hll_mv), 'get mv failed'
ret = client.get_index_schema(dup_tb, hll_mv)
print(util.get_attr(ret, DescInfo.Field))
print(util.get_attr(ret, DescInfo.Type))
column_fields = ['k0', 'hll_hash(k1)', 'hll_hash(k2)', 'hll_hash(k3)', 'hll_hash(k4)', 'hll_hash(k5)',
'hll_hash(k7)', 'hll_hash(k8)', 'hll_hash(k9)', 'hll_hash(k10)', 'hll_hash(k11)',
'hll_hash(k12)']
column_type = ['BOOLEAN', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL', 'HLL']
real_fields = [x.replace('`', '') for x in util.get_attr(ret, DescInfo.Field)]
assert column_fields == real_fields, 'check mv field failed, expect %s' % column_fields
assert column_type == util.get_attr(ret, DescInfo.Type), 'check mv field type failed, expect %s' % column_type
# load & check
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10',
'k11=v11', 'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, dup_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'load data failed'
sql = 'select k0, hll_union_agg(hll_hash(k1)), hll_union_agg(hll_hash(k2)), hll_union_agg(hll_hash(k3)),' \
' hll_union_agg(hll_hash(k4)), hll_union_agg(hll_hash(k5)), ' \
'hll_union_agg(hll_hash(k7)), hll_union_agg(hll_hash(k8)), hll_union_agg(hll_hash(k9)), ' \
'hll_union_agg(hll_hash(k10)), hll_union_agg(hll_hash(k11)), hll_union_agg(hll_hash(k12)) ' \
'from %s.%s group by k0'
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql % (database_name, dup_tb))
if hll_mv in mv_list:
break
assert hll_mv in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (hll_mv, mv_list)
common.check2(client, sql % (database_name, dup_tb), sql2=sql % (check_db, check_agg_tb), forced=True)
client.clean(database_name)
def test_create_dup_bitmap_mv():
"""
{
"title": "test_create_dup_bitmap_mv",
"describe": "测试创建dup表的物化视图,各种类型的bitmap_union聚合类型,验证创建成功,查询命中物化视图,结果正确。to_bitmap只支持正整数",
"tag": "function,p0,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
dup_tb = table_name + '_dup'
distribution_info = palo_client.DistributionInfo('HASH(k0)', 10)
ret = client.create_table(dup_tb, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert client.show_tables(dup_tb)
assert ret
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=abs(v1)', 'k2=abs(v2)', 'k3=abs(v3)', 'k4=abs(cast(v4 as bigint))', 'k6=v6', 'k7=v7',
'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11', 'k12=v12', 'k0=v7', 'k5=abs(v4)*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, dup_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret
# 预期创建物化视图时抛出异常,目前不抛异常,只是alter 任务cancel
sql = 'select k0, bitmap_union(to_bitmap(k1)), bitmap_union(to_bitmap(k2)), bitmap_union(to_bitmap(k3)),' \
' bitmap_union(to_bitmap(k4)), bitmap_union(to_bitmap(k5)), bitmap_union(to_bitmap(k6)), ' \
'bitmap_union(to_bitmap(k7)), bitmap_union(to_bitmap(k8)), bitmap_union(to_bitmap(k9)), ' \
'bitmap_union(to_bitmap(k10)), bitmap_union(to_bitmap(k11)), bitmap_union(to_bitmap(k12)) ' \
'from %s group by k0' % dup_tb
bitmap_mv = 'bitmap_mv'
flag = True
try:
ret = client.create_materialized_view(dup_tb, bitmap_mv, sql, is_wait=True)
flag = False
except Exception as e:
print(str(e))
assert flag, 'expect error'
sql = 'select k0, bitmap_union(to_bitmap(k1)), bitmap_union(to_bitmap(k2)), bitmap_union(to_bitmap(k3)), ' \
'bitmap_union(to_bitmap(k4)) from %s group by k0' % dup_tb
assert client.create_materialized_view(dup_tb, bitmap_mv, sql, is_wait=True)
assert client.get_index(dup_tb, bitmap_mv)
ret = client.get_index_schema(dup_tb, bitmap_mv)
column_fields = ['k0', 'to_bitmap_with_check(k1)', 'to_bitmap_with_check(k2)',
'to_bitmap_with_check(k3)', 'to_bitmap_with_check(k4)']
column_type = ['BOOLEAN', 'BITMAP', 'BITMAP', 'BITMAP', 'BITMAP']
real_fields = [x.replace('`', '') for x in util.get_attr(ret, DescInfo.Field)]
assert column_fields == real_fields, 'check mv field failed, expect %s' % column_fields
assert column_type == util.get_attr(ret, DescInfo.Type), 'check mv field type failed, expect %s' % column_type
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql)
if bitmap_mv in mv_list:
break
assert bitmap_mv in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (bitmap_mv, mv_list)
sql2 = 'select k0, bitmap_union(to_bitmap(abs(k1))), bitmap_union(to_bitmap(abs(k2))), ' \
'bitmap_union(to_bitmap(abs(k3))), bitmap_union(to_bitmap(abs(k4))) from %s.%s group by k0' \
% (check_db, check_agg_tb)
common.check2(client, sql, sql2=sql2, forced=True)
sql1 = 'select k0, count(distinct k1), count(distinct k2), count(distinct k3), count(distinct k4) ' \
'from %s.%s group by k0' % (database_name, dup_tb)
mv_list = common.get_explain_rollup(client, sql1)
assert bitmap_mv in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (bitmap_mv, mv_list)
sql2 = 'select k0, count(distinct abs(k1)), count(distinct abs(k2)), count(distinct abs(k3)), ' \
'count(distinct abs(k4)) from %s.%s group by k0' % (check_db, check_agg_tb)
common.check2(client, sql1=sql1, sql2=sql2, forced=True)
client.clean(database_name)
def test_create_dup_bitmap_mv_1():
"""
{
"title": "test_create_agg_mv_1",
"describe": "测试dup表物化视图bitmap聚合,建表,创建物化视图,导入,校验",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
dup_tb = table_name + '_dup'
distribution_info = palo_client.DistributionInfo('HASH(k0)', 10)
ret = client.create_table(dup_tb, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert ret, 'create table failed'
assert client.show_tables(dup_tb), 'get table failed'
# create mv
sql = 'select k0, bitmap_union(to_bitmap(k1)), bitmap_union(to_bitmap(k2)), bitmap_union(to_bitmap(k3)), ' \
'bitmap_union(to_bitmap(k4)) from %s group by k0' % dup_tb
assert client.create_materialized_view(dup_tb, index_name, sql, is_wait=True)
assert client.get_index(dup_tb, index_name)
# load & check
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=abs(v1)', 'k2=abs(v2)', 'k3=abs(v3)', 'k4=abs(cast(v4 as bigint))', 'k6=v6', 'k7=v7',
'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11', 'k12=v12', 'k0=v7', 'k5=abs(v4)*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, dup_tb,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql)
if index_name in mv_list:
break
assert index_name in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (index_name, mv_list)
sql2 = 'select k0, bitmap_union(to_bitmap(abs(k1))), bitmap_union(to_bitmap(abs(k2))), ' \
'bitmap_union(to_bitmap(abs(k3))), bitmap_union(to_bitmap(abs(k4))) from %s.%s group by k0' \
% (check_db, check_agg_tb)
common.check2(client, sql, sql2=sql2, forced=True)
sql1 = 'select k0, count(distinct k1), count(distinct k2), count(distinct k3), count(distinct k4) ' \
'from %s.%s group by k0' % (database_name, dup_tb)
mv_list = common.get_explain_rollup(client, sql1)
assert index_name in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (index_name, mv_list)
sql2 = 'select k0, count(distinct abs(k1)), count(distinct abs(k2)), count(distinct abs(k3)), ' \
'count(distinct abs(k4)) from %s.%s group by k0' % (check_db, check_agg_tb)
common.check2(client, sql1=sql1, sql2=sql2, forced=True)
client.clean(database_name)
def test_create_dup_bitmap_mv_negative():
"""
{
"title": "test_create_agg_mv",
"describe": "测试创建dup表的物化视图,bitmap_union聚合类型,只支持to_bitmap,创建mv成功后,导入负数失败",
"tag": "function,p1,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
distribution_info = palo_client.DistributionInfo('HASH(k0)', 10)
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert ret, 'create table failed'
assert client.show_tables(table_name), 'get table failed'
# create mv
sql = 'select k0, bitmap_union(to_bitmap(k1)), bitmap_union(to_bitmap(k2)), bitmap_union(to_bitmap(k3)), ' \
'bitmap_union(to_bitmap(k4)) from %s group by k0' % table_name
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'expect create mv failed'
assert client.get_index(table_name, index_name)
# load负数,导入失败
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=abs(v1)', 'k2=v2', 'k3=v3', 'k4=cast(v4 as bigint)', 'k6=v6', 'k7=v7',
'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11', 'k12=v12', 'k0=v7', 'k5=abs(v4)*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert not ret, 'expect load failed'
"""
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql)
if index_name in mv_list:
break
assert index_name in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (index_name, mv_list)
sql1 = 'select k0, bitmap_union_count(to_bitmap(k1)), bitmap_union_count(to_bitmap(k2)), ' \
'bitmap_union_count(to_bitmap(k3)), bitmap_union_count(to_bitmap(k4)) from %s group by k0' \
% table_name
sql2 = 'select k0, bitmap_union_count(to_bitmap(k1)), bitmap_union_count(to_bitmap(k2)), ' \
'bitmap_union_count(to_bitmap(k3)), bitmap_union_count(to_bitmap(k4)) from %s.%s group by k0' \
% (check_db, check_agg_tb)
common.check2(client, sql1, sql2=sql2, forced=True)
sql1 = 'select k0, count(distinct k1), count(distinct k2), count(distinct k3), ' \
'count(distinct k4) from %s.%s group by k0' % (database_name, table_name)
sql2 = 'select k0, count(distinct k1), count(distinct k2), count(distinct k3), ' \
'count(distinct k4) from %s.%s group by k0' % (check_db, check_agg_tb)
common.check2(client, sql1, sql2=sql2, forced=True)
"""
client.clean(database_name)
def test_create_dup_bitmap_mv_negative_1():
"""
{
"title": "test_create_agg_mv",
"describe": "测试创建dup表的物化视图,bitmap_union聚合类型,建表,导入负数,创建bitmap_union物化视图失败",
"tag": "function,p1,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
distribution_info = palo_client.DistributionInfo('HASH(k0)', 10)
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert ret, 'create table failed'
assert client.show_tables(table_name), 'get table failed'
# load
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=abs(v1)', 'k2=v2', 'k3=v3', 'k4=cast(v4 as bigint)', 'k6=v6', 'k7=v7',
'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11', 'k12=v12', 'k0=v7', 'k5=abs(v4)*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name,
column_name_list=column_name_list, set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'load data failed'
# create mv,有负数,创建失败
sql = 'select k0, bitmap_union(to_bitmap(k1)), bitmap_union(to_bitmap(k2)), bitmap_union(to_bitmap(k3)), ' \
'bitmap_union(to_bitmap(k4)) from %s group by k0' % table_name
assert not client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'expect create mv failed'
"""
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'create mv failed'
timeout = 600
while timeout > 0:
time.sleep(1)
timeout -= 1
mv_list = common.get_explain_rollup(client, sql)
if index_name in mv_list:
break
assert index_name in mv_list, 'check sql shoot mv failed.expect %s, actural: %s' % (index_name, mv_list)
sql1 = 'select k0, bitmap_union_count(to_bitmap(k1)), bitmap_union_count(to_bitmap(k2)), ' \
'bitmap_union_count(to_bitmap(k3)), bitmap_union_count(to_bitmap(k4)) from %s group by k0' \
% table_name
sql2 = 'select k0, bitmap_union_count(to_bitmap(k1)), bitmap_union_count(to_bitmap(k2)), ' \
'bitmap_union_count(to_bitmap(k3)), bitmap_union_count(to_bitmap(k4)) from %s.%s group by k0' \
% (check_db, check_agg_tb)
common.check2(client, sql1, sql2=sql2, forced=True)
sql1 = 'select k0, count(distinct k1), count(distinct k2), count(distinct k3), ' \
'count(distinct k4) from %s.%s group by k0' % (database_name, table_name)
sql2 = 'select k0, count(distinct k1), count(distinct k2), count(distinct k3), ' \
'count(distinct k4) from %s.%s group by k0' % (check_db, check_agg_tb)
common.check2(client, sql1, sql2=sql2, forced=True)
"""
client.clean(database_name)
def test_create_agg_mv_failed():
"""
{
"title": "test_create_agg_mv_failed",
"describe": "测试创建agg表的物化视图的限制, 支持一个列出现多次, 不支持其他函数,只支持select group其他不支持,物化视图的聚合方式必须和建表时value的保持一致",
"tag": "function,p1,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
agg_tb = table_name + '_agg'
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
ret = client.create_table(agg_tb, DATA.datatype_column_list, distribution_info=distribution_info)
assert ret
sql = 'select k1 ,k2, sum(k3) from %s group by k1, k2' % agg_tb
msg = ''
util.assert_return(True, msg, client.create_materialized_view, agg_tb, 'mv1_sum', sql)
sql = 'select k1 ,k2, sum(k11) from %s group by k1, k2' % agg_tb
msg = 'Aggregate function require same with slot aggregate type'
util.assert_return(False, msg, client.create_materialized_view, agg_tb, 'mv1', sql)
sql = 'select k1 ,k2, k1, sum(k12) from %s group by k1, k2' % agg_tb
msg = ''
util.assert_return(True, msg, client.create_materialized_view, agg_tb, 'mv1', sql)
sql = 'select k1 ,k2, sum(k12) from %s' % agg_tb
msg = 'select list expression not produced by aggregation output (missing from GROUP BY clause?): `k1`'
util.assert_return(False, msg, client.create_materialized_view, agg_tb, 'mv1', sql)
sql = 'select sum(k12) from %s' % agg_tb
msg = 'The materialized view must contain at least one key column'
util.assert_return(False, msg, client.create_materialized_view, agg_tb, 'mv1', sql)
sql = 'select k2 ,k1, lower(k6) from %s' % agg_tb
msg = 'The materialized view of aggregation or unique table must has grouping columns'
util.assert_return(False, msg, client.create_materialized_view, agg_tb, 'mv1_lower', sql)
sql = 'select k2 ,k1, k12 from %s group by k2, k1, k12' % agg_tb
msg = ''
util.assert_return(False, msg, client.create_materialized_view, agg_tb, 'mv1_agg_type', sql)
sql = 'select k2 ,k1, k13 from %s group by k2, k1, k13' % agg_tb
msg = "must use with specific function, and don't support filter"
util.assert_return(False, msg, client.create_materialized_view, agg_tb, 'mv1', sql)
sql = 'select k2 ,k1, k3 from %s where k3 > 0 group by k2, k1, k3' % agg_tb
msg = ''
util.assert_return(True, msg, client.create_materialized_view, agg_tb, 'mv1_where', sql)
sql = 'select k2 ,k1, k3, sum(k4 + k5) from %s group by k2, k1, k3' % agg_tb
msg = ''
util.assert_return(True, msg, client.create_materialized_view, agg_tb, 'mv1_sum_expr', sql)
sql = 'select k1, k2 from %s' % agg_tb
msg = 'The materialized view of aggregation or unique table must has grouping columns'
util.assert_return(False, msg, client.create_materialized_view, agg_tb, 'mv1_no_agg', sql)
sql = 'select k1, k2, sum(k4) from %s group by k1, k2' % agg_tb
msg = ''
util.assert_return(True, msg, client.create_materialized_view, agg_tb, 'mv1_not_key', sql)
sql = 'select k1, k2, count(k4) from %s group by k1, k2' % agg_tb
msg = ''
util.assert_return(True, msg, client.create_materialized_view, agg_tb, 'mv1_count', sql, is_wait=True)
sql = 'select k1, k2, hll_union(k4) from %s group by k1, k2' % agg_tb
msg = 'HLL_UNION, HLL_UNION_AGG, HLL_RAW_AGG and HLL_CARDINALITY\'s params must be hll column'
util.assert_return(False, msg, client.create_materialized_view, agg_tb, 'mv1', sql)
client.clean(database_name)
def test_create_dup_mv_failed():
"""
{
"title": "test_create_agg_mv_failed",
"describe": "测试创建dup表的物化视图的限制, 支持一个列出现多次, 不支持其他函数,支持select和select group其他不支持",
"tag": "function,p1,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert ret
sql = 'select k1 ,k2, sum(k3), min(k3) from %s group by k1, k2' % table_name
msg = ''
util.assert_return(True, msg, client.create_materialized_view, table_name, 'mv100', sql, is_wait=True)
sql = 'select k4 ,k5, sum(k1), max(k2), min(k3) from %s group by k4, k5' % table_name
msg = ''
util.assert_return(True, msg, client.create_materialized_view, table_name, 'mv11', sql, is_wait=True)
sql = 'select k1 ,k2, k1, sum(k12) from %s group by k1, k2' % table_name
msg = ''
util.assert_return(True, msg, client.create_materialized_view, table_name, 'mv101', sql, is_wait=True)
sql = 'select k1 ,k2, sum(k12) from %s' % table_name
msg = 'select list expression not produced by aggregation output (missing from GROUP BY clause?): `k1`'
util.assert_return(False, msg, client.create_materialized_view, table_name, 'mv1', sql)
sql = 'select sum(k12) from %s' % table_name
msg = 'The materialized view must contain at least one key column'
util.assert_return(False, msg, client.create_materialized_view, table_name, 'mv1', sql)
sql = 'select k2 ,k1, lower(k6) from %s' % table_name
msg = ''
util.assert_return(True, '', client.create_materialized_view, table_name, 'mv102', sql)
sql = 'select k2 ,k1, k12 from %s' % table_name
msg = ''
util.assert_return(True, msg, client.create_materialized_view, table_name, 'mv12', sql, is_wait=True)
sql = 'select k2 ,k1, k3 from %s where k3 > 0' % table_name
msg = 'The where clause is not supported in add materialized view clause, expr:`k3` > 0'
util.assert_return(False, msg, client.create_materialized_view, table_name, 'mv1', sql)
sql = 'select k2 ,k1, k3, sum(k4 + k5) from %s group by k2, k1, k3' % table_name
msg = 'The function sum must match pattern:sum(column)'
util.assert_return(False, msg, client.create_materialized_view, table_name, 'mv1', sql)
sql = 'select k2 ,k1, k3, sum(k4) from %s group by k2, k1, k3' % table_name
msg = 'The partition and distributed columns k4 must be key column in mv'
util.assert_return(False, msg, client.create_materialized_view, table_name, 'mv1', sql)
sql = 'select k2 ,k1, k3, hll_union(hll_hash(k6)) from %s group by k2, k1, k3' % table_name
# msg = 'The function hll_union must match pattern:hll_union(hll_hash(column)) column could not be decimal.' \
# ' Or hll_union(hll_column) in agg table'
util.assert_return(True, 'ok', client.create_materialized_view, table_name, 'mv1', sql, is_wait=True)
client.clean(database_name)
def test_broker_load_with_mv():
"""
{
"title": "test_create_agg_mv_failed",
"describe": "测试创建dup表的物化视图后,进行broker导入,导入成功,base表和物化视图的查询结果正确",
"tag": "function,p1"
}
"""
"""todo"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert client.show_tables(table_name), 'get table failed'
assert ret, 'create table failed'
# create mv
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(k1)) from %s.%s group by k0'
assert client.create_materialized_view(table_name, index_name, sql % (database_name, table_name),
is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'get mv failed'
# broker load
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
assert ret, 'load data failed'
# check
mv_list = common.get_explain_rollup(client, sql % (database_name, table_name))
assert mv_list == [index_name], 'expect rollup: %s' % mv_list
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, check_agg_tb), forced=True)
sql = 'select k0, k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12 from %s.%s order by k1'
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, check_agg_tb))
sql = 'select k0, max(k9), min(k11), sum(k12), count(k10), hll_union_agg(hll_hash(k7)), ' \
'count(distinct k1) from %s.%s group by k0'
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, check_agg_tb), forced=True)
client.clean(database_name)
def test_stream_load_with_mv():
"""
{
"title": "test_stream_load_with_mv",
"describe": "测试创建dup表的物化视图后,进行stream导入,导入成功,base表和物化视图的查询结果正确",
"tag": "function,p1"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert ret, 'create table failed'
assert client.show_tables(table_name), 'get table failed'
# create mv
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(k1)) from %s.%s group by k0'
assert client.create_materialized_view(table_name, index_name, sql % (database_name, table_name),
is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'get mv failed'
# stream load
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12', 'k1=v1',
'k2=v2', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list)
assert ret, 'stream load failed'
# check data
mv_list = common.get_explain_rollup(client, sql % (database_name, table_name))
assert mv_list == [index_name], 'expect mv: %s, actural mv: %s' % (mv_list, mv_list)
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, check_agg_tb), forced=True)
sql = 'select k0, k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12 from %s.%s order by k1'
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, check_agg_tb))
sql = 'select k0, max(k9), min(k11), sum(k12), count(k10), hll_union_agg(hll_hash(k7)), ' \
'count(distinct k1) from %s.%s group by k0'
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, check_agg_tb), forced=True)
client.clean(database_name)
def test_insert_with_mv():
"""
{
"title": "test_stream_load_with_mv",
"describe": "测试创建dup表的物化视图后,进行stream导入,导入成功,base表和物化视图的查询结果正确",
"tag": "function,p1"
}
"""
"""
验证创建物化视图后,进行数据导入,删除,查询结果正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert ret, 'create table failed'
assert client.show_tables(table_name), 'get table failed'
# create mv
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(k1)) from %s.%s group by k0'
assert client.create_materialized_view(table_name, index_name, sql % (database_name, table_name), is_wait=True)
assert client.get_index(table_name, index_name)
# insert data
isql = 'insert into %s.%s values(1, 0, 0, 0, 0, 0, 0, "true", "2020-01-01", "2020-01-01 09:00:00", ' \
'"hello", 0.12, 0.13)' % (database_name, table_name)
ret = client.execute(isql)
assert ret == ()
# check data
mv_list = common.get_explain_rollup(client, sql % (database_name, table_name))
assert mv_list == [index_name], 'expect rollup: %s' % mv_list
sql2 = 'select 1, cast("2020-01-01 09:00:00" as datetime), 0.12, 0.13, 1, null, null'
common.check2(client, sql1=sql % (database_name, table_name), sql2=sql2, forced=True)
sql1 = 'select * from %s.%s' % (database_name, table_name)
sql2 = 'select 1, 0, 0, 0, 0, "0", 0.0, "true", cast("2020-01-01" as date), ' \
'cast("2020-01-01 09:00:00" as datetime), "hello", 0.12, 0.13'
common.check2(client, sql1=sql1, sql2=sql2)
client.clean(database_name)
def test_null_with_mv():
"""
{
"title": "test_null_with_mv",
"describe": "测试物化视图的Null值,包括创建和查询命中,与分区表无关",
"tag": "function,p0"
}
"""
"""测试物化视图的Null值,包括创建和查询命中,与分区表无关"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
distribution_info = palo_client.DistributionInfo('HASH(k0)', 10)
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
distribution_info=distribution_info, set_null=True)
assert ret, 'create table failed'
assert client.show_tables(table_name), 'get table failed'
# create mv
sql1 = 'select k0, max(k1), max(k2), max(k3), max(k4), max(k5), max(k6), max(k7), max(k8), max(k9), ' \
'max(k10), max(k11), max(k12) from %s group by k0' % table_name
sql2 = 'select k0, min(k1), min(k2), min(k3), min(k4), min(k5), min(k6), min(k7), min(k8), min(k9), ' \
'min(k10), min(k11), min(k12) from %s group by k0' % table_name
sql3 = 'select k0, sum(k1), sum(k2), sum(k3), sum(k4), sum(k5), sum(k6), sum(k11), sum(k12) from %s ' \
'group by k0' % table_name
sql4 = 'select k0, count(k1), count(k2), count(k3), count(k4), count(k5), count(k6), count(k7), ' \
'count(k8), count(k9), count(k10), count(k11), count(k12) from %s group by k0' % table_name
sql5 = 'select k0, hll_union(hll_hash(k1)), hll_union(hll_hash(k2)), hll_union(hll_hash(k3)),' \
' hll_union(hll_hash(k4)), hll_union(hll_hash(k5)), ' \
'hll_union(hll_hash(k7)), hll_union(hll_hash(k8)), hll_union(hll_hash(k9)), ' \
'hll_union(hll_hash(k10)), hll_union(hll_hash(k11)), hll_union(hll_hash(k12)) ' \
'from %s group by k0' % table_name
sql6 = 'select k0, bitmap_union(to_bitmap(k1)), bitmap_union(to_bitmap(k2)), bitmap_union(to_bitmap(k3)), ' \
'bitmap_union(to_bitmap(k4)) from %s group by k0' % table_name
mv1, mv2, mv3, mv4, mv5, mv6 = 'mv_max', 'mv_min', 'mv_sum', 'mv_count', 'mv_hll', 'mv_bitmap'
assert client.create_materialized_view(table_name, mv1, sql1, is_wait=True), 'create mv failed'
assert client.create_materialized_view(table_name, mv2, sql2, is_wait=True), 'create mv failed'
assert client.create_materialized_view(table_name, mv3, sql3, is_wait=True), 'create mv failed'
assert client.create_materialized_view(table_name, mv4, sql4, is_wait=True), 'create mv failed'
assert client.create_materialized_view(table_name, mv5, sql5, is_wait=True), 'create mv failed'
assert client.create_materialized_view(table_name, mv6, sql6, is_wait=True), 'create mv failed'
# insert data
insert_sql = 'insert into %s.%s values(1, 0, 0, 0, 0, 0, 0, "true", "2020-01-01", ' \
'"2020-01-01 09:00:00", "hello", 0.12, 0.13),' \
'(0, null, null, null, null, null, null, null, null, null, null, null, null), ' \
'(1, null, null, null, null, null, null, null, null, null, null, null, null)' \
% (database_name, table_name)
rows, ret = client.execute(insert_sql, True)
assert ret == (), 'insert data failed'
assert rows == 3, 'expect 3 rows affected'
# check
sql1_check = 'select 1, 0, 0, 0, 0, "0", 0, "true", cast("2020-01-01" as date), ' \
'cast("2020-01-01 09:00:00" as datetime), "hello", 0.12, 0.13 union ' \
'select 0, null, null, null, null, null, null, null, null, null, null, null, null'
common.check2(client, sql1=sql1, sql2=sql1_check, forced=True)
common.check2(client, sql1=sql2, sql2=sql1_check, forced=True)
sql3_check = 'select 1, 0, 0, 0, 0, "0", 0, 0.12, 0.13 union ' \
'select 0, null, null, null, null, null, null, null, null'
common.check2(client, sql1=sql3, sql2=sql3_check, forced=True)
sql4_check = 'select 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 union select 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0'
common.check2(client, sql1=sql4, sql2=sql4_check, forced=True)
sql5 = 'select k0, hll_union_agg(hll_hash(k1)), hll_union_agg(hll_hash(k2)), hll_union_agg(hll_hash(k3)),' \
' hll_union_agg(hll_hash(k4)), hll_union_agg(hll_hash(k5)), ' \
'hll_union_agg(hll_hash(k7)), hll_union_agg(hll_hash(k8)), hll_union_agg(hll_hash(k9)), ' \
'hll_union_agg(hll_hash(k10)), hll_union_agg(hll_hash(k11)), hll_union_agg(hll_hash(k12)) ' \
'from %s group by k0' % table_name
sql5_check = 'select 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 union select 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0'
common.check2(client, sql1=sql5, sql2=sql5_check, forced=True)
sql6 = 'select k0, count(distinct k1), count(distinct k2), count(distinct k3), count(distinct k4) ' \
'from %s group by k0' % table_name
sql6_check = 'select 1, 1, 1, 1, 1 union select 0, 0, 0, 0, 0'
common.check2(client, sql1=sql6, sql2=sql6_check, forced=True)
client.clean(database_name)
def test_add_partiton_with_mv():
"""
{
"title": "test_add_partiton_with_mv",
"describe": "验证创建物化视图后,增加分区,向分区导入数据,数据正确,查询结果正确",
"tag": "system,p1"
}
"""
"""
验证创建物化视图后,增加分区,向分区导入数据,数据正确,查询结果正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# 建表
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3"], ["-10", "0", "10"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
distribution_info=distribution_info, partition_info=partition_info)
assert ret, 'create table filed'
assert client.show_tables(table_name), 'get table failed'
# 创建物化视图
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(k2)) from %s.%s group by k0' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'get mv failed'
# 导入
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret, 'load data failed'
# 增加分区后导入
assert client.add_partition(table_name, 'p4', '20'), 'add partition failed'
assert client.get_partition(table_name, 'p4'), 'get new partition failed'
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info)
# check data
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % mv_list
sql2 = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(abs(k1))) from (select k0, k1, k9, k11, k12, k7, k10 from %s.%s ' \
'union all select k0, k1, k9, k11, k12, k7, k10 from %s.%s where k1 < 10) sub ' \
'group by k0' % (check_db, check_agg_tb, check_db, check_agg_tb)
common.check2(client, sql1=sql, sql2=sql2, forced=True)
client.clean(database_name)
def test_drop_partition_with_mv():
"""
{
"title": "test_drop_partiton_with_mv",
"describe": "验证创建物化视图后,删除分区,向分区导入数据,数据正确,查询结果正确",
"tag": "system,p1"
}
"""
"""
验证创建物化视图后,增加分区,向分区导入数据,数据正确,查询结果正确
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list,
distribution_info=distribution_info, partition_info=partition_info)
assert ret, 'create table failed'
assert client.show_tables(table_name), 'get table failed'
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'get mv failed'
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret, 'load data failed'
# 删除分区
assert client.drop_partition(table_name, 'p4'), 'drop partition failed'
assert not client.get_partition(table_name, 'p4'), 'get dropped partition'
# check data
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % mv_list
sql2 = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(abs(k2))) from (select * from %s.%s where k1 < 10) sub group by k0' \
% (check_db, check_agg_tb)
common.check2(client, sql1=sql, sql2=sql2, forced=True)
client.clean(database_name)
def test_drop_value_column_in_mv():
"""
{
"title": "test_drop_value_column_in_mv",
"describe": "创建mv后,删除mv中的列,验证删除value列成功,验证mv中的schema,查询命中mv,数据正确",
"tag": "system,p1"
}
"""
"""
创建mv后,删除mv中的列
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert ret, 'create table failed'
assert client.show_tables(table_name), 'get table failed'
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'get mv failed'
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret, 'load data failed'
ret = client.schema_change_drop_column(table_name, ['k12'], index_name, is_wait_job=True)
assert ret, 'drop column failed'
sql = 'SELECT k0, max(k9), min(k11), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0' % (database_name, table_name)
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'shoot mv error. expect %s, actural %s' % (mv_list, index_name)
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
sql2 = 'select k0, k1, abs(k2), k3, k4, k5, k6, k7, k8, k9, k10, k11, k12 from %s.%s ' \
'order by k1' % (check_db, check_agg_tb)
common.check2(client, sql1=sql1, sql2=sql2)
ret = client.get_index_schema(table_name, index_name)
column_fields = [x.replace('`', '') for x in util.get_attr(ret, DescInfo.Field)]
expect_fields = ['k0', 'k9', 'k11', 'CASE WHEN k10 IS NULL THEN 0 ELSE 1 END',
'hll_hash(k7)', 'to_bitmap_with_check(k2)']
assert column_fields == expect_fields, 'mv filed check error, expect %s' % expect_fields
client.clean(database_name)
def test_drop_key_column_in_mv():
"""
{
"title": "test_drop_key_column_in_mv",
"describe": "duplicate表创建mv后,删除mv中的key列, 验证删除key列成功,验证mv中的schema,查询命中mv,数据正确",
"tag": "system,p1"
}
"""
"""duplicate表,删除mv中的key列"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert client.show_tables(table_name), 'can not get table'
assert ret, 'create table failed'
sql = 'SELECT k0, k8, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(k2)) from %s.%s group by k0, k8'
assert client.create_materialized_view(table_name, index_name, sql % (database_name, table_name),
is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'can not get mv: %s' % index_name
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret, 'load data failed'
mv_list = common.get_explain_rollup(client, sql % (database_name, table_name))
assert mv_list == [index_name], 'expect rollup: %s' % mv_list
ret = client.schema_change_drop_column(table_name, ['k0'], index_name, is_wait_job=True)
assert ret, 'drop mv k0 failed'
sql = 'SELECT k8, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k8' % (database_name, table_name)
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % index_name
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
sql2 = 'select k0, k1, abs(k2), k3, k4, k5, k6, k7, k8, k9, k10, k11, k12 from %s.%s ' \
'order by k1' % (check_db, check_agg_tb)
common.check2(client, sql1=sql1, sql2=sql2)
ret = client.get_index_schema(table_name, index_name)
column_fields = [x.replace('`', '') for x in util.get_attr(ret, DescInfo.Field)]
expect_fields = ['k8', 'k9', 'k11', 'k12', 'CASE WHEN k10 IS NULL THEN 0 ELSE 1 END',
'hll_hash(k7)', 'to_bitmap_with_check(k2)']
assert column_fields == expect_fields, 'mv filed check error'
client.clean(database_name)
def test_add_key_column_in_mv():
"""
{
"title": "test_add_key_column_in_mv",
"describe": "duplicate表创建mv后,向mv和base表中增加key列,验证增加成功,验证mv中的schema,查询命中mv,数据正确",
"tag": "system,p1"
}
"""
"""duplicate表,向mv中增加key列"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert client.show_tables(table_name), 'can not get table'
assert ret, 'create table failed'
sql = 'SELECT k0, max(k8), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0'
assert client.create_materialized_view(table_name, index_name, sql % (database_name, table_name),
is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'can not get mv: %s' % index_name
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret, 'load data failed'
mv_list = common.get_explain_rollup(client, sql % (database_name, table_name))
assert mv_list == [index_name], 'expect rollup: %s' % mv_list
add_column_list = [('k_add', 'int', 'key', '0')]
ret = client.schema_change_add_column(table_name, add_column_list, to_table_name=index_name, is_wait_job=True)
assert ret, 'drop mv k0 failed'
sql = 'SELECT k0, k_add, max(k8), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(k2)) from %s.%s group by k0, k_add' % (database_name, table_name)
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % index_name
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
sql2 = 'select k0, k1, abs(k2), 0, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12 from %s.%s order by k1' % (
check_db, check_agg_tb)
common.check2(client, sql1=sql1, sql2=sql2)
ret = client.get_index_schema(table_name, index_name)
column_fields = [x.replace('`', '') for x in util.get_attr(ret, DescInfo.Field)]
expect_fields = ['k0', 'k_add', 'k8', 'k11', 'k12', 'CASE WHEN k10 IS NULL THEN 0 ELSE 1 END',
'hll_hash(k7)', 'to_bitmap_with_check(k2)']
assert column_fields == expect_fields, 'mv filed check error: expect %s' % expect_fields
client.clean(database_name)
def test_modify_column_type_in_mv():
"""
{
"title": "test_modify_column_type_in_mv",
"describe": "duplicate表,修改mv中的列类型",
"tag": "system,p1"
}
"""
"""duplicate表,修改mv中的列类型"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info)
assert client.show_tables(table_name), 'can not get table'
assert ret, 'create table failed'
sql = 'SELECT k1, max(k8), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k1' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'can not get mv: %s' % index_name
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret, 'load data failed'
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % mv_list
modify_column_list = [('k1', 'int', 'key')]
ret = client.schema_change_modify_column(table_name, 'k1', 'INT KEY', is_wait_job=True)
assert ret, 'drop mv k0 failed'
sql = 'SELECT k1, max(k8), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k1' % (database_name, table_name)
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % index_name
sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
sql2 = 'select k0, k1, abs(k2), k3, k4, k5, k6, k7, k8, k9, k10, k11, k12 from %s.%s order by k1' % (
check_db, check_agg_tb)
common.check2(client, sql1=sql1, sql2=sql2)
ret = client.get_index_schema(table_name, index_name)
column_fields = [x.replace('`', '') for x in util.get_attr(ret, DescInfo.Field)]
expect_fields = ['k1', 'k8', 'k11', 'k12', 'CASE WHEN k10 IS NULL THEN 0 ELSE 1 END', 'hll_hash(k7)',
'to_bitmap_with_check(k2)']
assert column_fields == expect_fields, 'mv filed check error, expect: %s' % expect_fields
client.clean(database_name)
def test_modify_column_order_in_mv():
"""
{
"title": "test_modify_column_order_in_mv",
"describe": "duplicate表,修改mv中的列顺序",
"tag": "function,p1,fuzz"
}
"""
"""duplicate表,修改mv中的列顺序"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert client.show_tables(table_name), 'can not get table'
assert ret, 'create table failed'
sql = 'SELECT k0, k1, max(k8), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(k2)) from %s.%s group by k1, k0' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'can not get mv: %s' % index_name
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret, 'load data failed'
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % mv_list
modify_column_list = ['k1', 'k0', 'mv_hll_union_k7', 'mv_bitmap_union_k2', 'k8', 'k11', 'k12']
flag = True
try:
ret = client.schema_change_order_column(table_name, column_name_list=modify_column_list,
from_table_name=index_name, is_wait_job=True)
flag = False
except Exception as e:
print(str(e))
# 暂时不支持修改含有hll_union, bitmap_union, count聚合的物化视图,语法解析报错
# assert ret, 'drop mv k0 failed'
# sql = 'SELECT k1, max(k8), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
# 'from %s.%s group by k1' % (database_name, table_name)
# mv_list = common.get_explain_rollup(client, sql)
# assert mv_list == [index_name], 'expect rollup: %s' % index_name
# sql1 = 'select * from %s.%s order by k1' % (database_name, table_name)
# sql2 = 'select k0, k1, abs(k2), k3, k4, k5, k6, k7, k8, k9, k10, k11, k12 from %s.%s order by k1' % (
# check_db, check_agg_tb)
# common.check2(client, sql1=sql1, sql2=sql2)
# ret = client.get_index_schema(table_name, index_name)
# column_fields = util.get_attr(ret, DescInfo.Field)
# assert column_fields == modify_column_list, 'mv filed check error'
client.clean(database_name)
def test_alter_not_support():
"""
{
"title": "test_alter_not_support",
"describe": "duplicate表,验证不支持的alter操作",
"tag": "function,p0,fuzz"
}
"""
"""验证不支持的alter操作"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
# init db & table
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert ret, 'create table failed'
assert client.show_tables(table_name), 'can not get table'
# create mv
sql = 'SELECT k0, k1, k3, max(k8), min(k11), sum(k12), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0, k1, k3' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'create mv failed'
assert client.get_index(table_name, index_name), 'can not get mv: %s' % index_name
# load data & check
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret, 'load data failed'
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % mv_list
sql = 'ALTER TABLE %s.%s ADD COLUMN v_add HLL TO %s' % (database_name, table_name, index_name)
msg = 'Bitmap and hll type have to use aggregate function'
util.assert_return(False, msg, client.execute, sql)
sql = 'ALTER TABLE %s.%s ADD COLUMN k1 INT KEY TO %s' % (database_name, table_name, index_name)
msg = 'Can not add column which already exists in base table: k1'
util.assert_return(False, msg, client.execute, sql)
sql = 'ALTER TABLE %s.%s ADD COLUMN v_add INT TO %s' % (database_name, table_name, index_name)
msg = 'Please add non-key column on base table directly'
util.assert_return(False, msg, client.execute, sql)
sql = 'ALTER TABLE %s.%s ADD COLUMN v_add INT SUM DEFAULT "0" TO %s' % (database_name, table_name, index_name)
msg = 'Can not assign aggregation method on column in Duplicate data model table: v_add'
util.assert_return(False, msg, client.execute, sql)
sql = 'ALTER TABLE %s.%s MODIFY COLUMN k1 BIGINT KEY' % (database_name, table_name)
msg = 'Can not modify partition column[k1]'
util.assert_return(False, msg, client.execute, sql)
sql = 'ALTER TABLE %s.%s MODIFY COLUMN k1 BIGINT' % (database_name, table_name)
# msg = 'Invalid column order'
msg = ' '
util.assert_return(False, msg, client.execute, sql)
sql = 'ALTER TABLE %s.%s MODIFY COLUMN k3 BIGINT DEFAULT "1"' % (database_name, table_name)
msg = 'Can not change default value'
util.assert_return(False, msg, client.execute, sql)
sql = 'ALTER TABLE %s.%s MODIFY COLUMN k2 BIGINT DEFAULT NULL' % (database_name, table_name)
msg = 'Can not change aggregation'
util.assert_return(False, msg, client.execute, sql)
sql = 'ALTER TABLE %s.%s MODIFY COLUMN k3 BIGINT KEY DEFAULT NULL FROM %s' % (database_name,
table_name, index_name)
msg = 'Do not need to specify index name when just modifying column type'
util.assert_return(False, msg, client.execute, sql)
client.clean(database_name)
def test_delete_with_mv():
"""
{
"title": "test_delete_with_mv",
"describe": "duplicate表,创建物化视图后,删除某几行记录,验证删除成功,数据正确",
"tag": "function,p1"
}
"""
"""
创建物化视图后,删除某几行记录
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert client.show_tables(table_name)
assert ret
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True)
assert client.get_index(table_name, index_name)
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret
ret = client.delete(table_name, [('k0', '=', '0')], 'p3')
assert ret
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % index_name
sql2 = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(abs(k2))) from %s.%s where (k0 != 0 and k1 < 10) or ' \
'k1 >= 10 group by k0' % (check_db, check_agg_tb)
common.check2(client, sql1=sql, sql2=sql2, forced=True)
sql1 = 'SELECT * FROM %s.%s order by k1' % (database_name, table_name)
sql2 = 'SELECT k0, k1, abs(k2), k3, k4, k5, k6, k7, k8, k9, k10, k11, k12 FROM %s.%s ' \
'WHERE (k0 != 0 and k1 < 10) or k1 >= 10 order by k1' % (check_db, check_agg_tb)
common.check2(client, sql1=sql1, sql2=sql2)
client.clean(database_name)
def test_delete_with_mv_failed():
"""
{
"title": "test_delete_with_mv_failed",
"describe": "duplicate表,物化视图,删除操作的限制",
"tag": "function,p1,fuzz"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert client.show_tables(table_name)
assert ret
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True)
assert client.get_index(table_name, index_name)
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret
ret = client.delete(table_name, [('k0', '=', '0')], 'p3')
msg = "Unknown column 'k1' in 'index"
util.assert_return(False, msg, client.delete, table_name, [('k1', '<', '0')], 'p1')
msg = " "
util.assert_return(False, msg, client.delete, table_name, [('k2', '<', '0')], 'p1')
client.clean(database_name)
def test_create_mv_when_load():
"""
{
"title": "test_create_mv_when_load",
"describe": "duplicate表,验证创建物化视图的同时向表进行数据导入,验证创建成功",
"tag": "system,p0,stability"
}
"""
"""
验证创建物化视图的同时向表进行数据导入,验证创建成功
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert client.show_tables(table_name)
assert ret
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql)
state = RollupJob(client.show_rollup_job()[0]).get_state()
succ_cnt = 0
while state != 'FINISHED' and state != 'CANCELLED':
ret = client.batch_load(util.get_label(), data_desc_list, broker=broker_info, is_wait=True)
if ret:
succ_cnt += 1
time.sleep(2)
state = RollupJob(client.show_rollup_job()[0]).get_state()
print(state)
assert 'FINISHED' == RollupJob(client.show_rollup_job()[0]).get_state()
client.wait_table_load_job()
ret = client.show_load()
finished_job = util.get_attr_condition_list(ret, LoadJob.State, 'FINISHED', LoadJob.Label)
count = len(finished_job)
# check
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % index_name
sql2 = 'SELECT k0, max(k9), min(k11), sum(k12) * %s, count(k10) * %s, hll_union(hll_hash(k7)), ' \
'bitmap_union(to_bitmap(abs(k2))) from %s.%s group by k0' % (count, count, check_db, check_agg_tb)
common.check2(client, sql1=sql, sql2=sql2, forced=True)
client.clean(database_name)
def test_create_mv_based_on_mv():
"""
{
"title": "test_create_mv_based_on_mv",
"describe": "duplicate表,基于物化视图创建物化视图",
"tag": "function,p1"
}
"""
"""基于物化视图创建物化视图"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
# create table
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert client.show_tables(table_name)
assert ret
# load data
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret
# create mv
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True)
assert client.get_index(table_name, index_name)
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % index_name
# create mv based on mv
sql = 'SELECT k0, max(k9), min(k11), count(k10) from %s.%s group by k0' % (database_name, table_name)
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name], 'expect rollup: %s' % index_name
index_name1 = 'new_' + index_name
assert client.create_materialized_view(table_name, index_name1, sql, is_wait=True)
assert client.get_index(table_name, index_name1)
mv_list = common.get_explain_rollup(client, sql)
assert mv_list == [index_name1], 'expect rollup: %s' % index_name1
sql2 = 'SELECT k0, max(k9), min(k11), count(k10) from %s.%s group by k0' % (check_db, check_agg_tb)
common.check2(client, sql1=sql, sql2=sql2, forced=True)
client.clean(database_name)
def test_drop_mv():
"""
{
"title": "test_create_mv_based_on_mv",
"describe": "duplicate表,验证删除物化视图",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
distribution_info = palo_client.DistributionInfo('HASH(k4, k5)', 10)
partition_info = palo_client.PartitionInfo("k1", ["p1", "p2", "p3", "p4"], ["-10", "0", "10", "20"])
ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, distribution_info=distribution_info,
partition_info=partition_info)
assert client.show_tables(table_name)
assert ret
sql = 'SELECT k0, max(k9), min(k11), sum(k12), count(k10), hll_union(hll_hash(k7)), bitmap_union(to_bitmap(k2)) ' \
'from %s.%s group by k0' % (database_name, table_name)
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True)
assert client.get_index(table_name, index_name)
column_name_list = ['v1', 'v2', 'v3', 'v4', 'v6', 'v7', 'v8', 'v9', 'v10', 'v11', 'v12']
set_list = ['k1=v1', 'k2=abs(v2)', 'k3=v3', 'k4=v4', 'k6=v6', 'k7=v7', 'k8=v8', 'k9=v9', 'k10=v10', 'k11=v11',
'k12=v12', 'k0=v7', 'k5=v4*101']
data_desc_list = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_name_list,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_desc_list, is_wait=True, broker=broker_info, max_filter_ratio=0.5)
assert ret
ret = client.drop_materialized_view(database_name, table_name, index_name)
assert ret == ()
assert not client.get_index(table_name, index_name)
sql1 = 'select * from %s' % table_name
sql2 = 'select k0, k1, abs(k2), k3, k4, k5, k6, k7, k8, k9, k10, k11, k12 from %s.%s' % (database_name, table_name)
common.check2(client, sql1=sql1, sql2=sql2, forced=True)
client.clean(database_name)
def test_issue_5164():
"""
{
"title": "test_issue_5164",
"describe": "5164",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
column_list = [('record_id', 'int'), ('seller_id', 'int'), ('store_id', 'int'),
('sale_date', 'date'), ('sale_amt', 'bigint')]
ret = client.create_table(table_name, column_list)
assert ret
sql = 'select store_id from %s' % table_name
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True)
sql = 'insert into %s(record_id, seller_id, store_id, sale_date, sale_amt) ' \
'values(1, 1, 1, "2020-12-30",1)' % table_name
assert client.execute(sql) == ()
sql1 = "with t as (SELECT store_id as id , 'kks' as aaa, sum(seller_id) as seller, sum(sale_amt) as a " \
"FROM %s GROUP BY store_id) " \
"select id, t1 from(select id, a as t1 from t union all select id, aaa as t1 from t) k2" % table_name
sql2 = "select 1, '1' union select 1, 'kks'"
common.check2(client, sql1=sql1, sql2=sql2, forced=True)
client.clean(database_name)
def test_issue_7361():
"""
{
"title": "test_issue_7361",
"describe": "Mv rewrite bug may cause SQL failure",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_num_format_name_list()
LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name))
client = common.create_workspace(database_name)
column_list = [('k1', 'int'), ('k2', 'int')]
ret = client.create_table(table_name, column_list, set_null=True)
assert ret, 'create table failed'
sql = 'select k1, count(k2) from %s group by k1' % table_name
assert client.create_materialized_view(table_name, index_name, sql, is_wait=True), 'create mv failed'
sql = 'insert into %s values(1, 1), (2, 2), (3, 3)' % table_name
assert client.execute(sql) == ()
sql = 'insert into %s values(1, 1), (2, null), (3, null)' % table_name
assert client.execute(sql) == ()
sql1 = 'select k1, count(k2) / count(1) from %s group by k1' % table_name
sql2 = 'select 1, 1 union select 2, 0.5 union select 3, 0.5'
common.check2(client, sql1=sql1, sql2=sql2, forced=True)
client.clean(database_name)
if __name__ == '__main__':
setup_module()