| #!/bin/env python |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| """ |
| /*************************************************************************** |
| * |
| * @file test_sys_partition_schema_change.py |
| * @date 2015/02/04 15:26:21 |
| * @brief This file is a test file for Palo schema changing. |
| * |
| **************************************************************************/ |
| 对于unique表的导入来说,每条数据都是有全部的key的,相当于是按照全key进行数据delete的 |
| """ |
| |
| import os |
| import sys |
| import time |
| |
| sys.path.append("../") |
| sys.path.append("../../") |
| from data import schema as DATA |
| from data import load_file as FILE |
| from lib import palo_config |
| from lib import palo_client |
| from lib import util |
| from lib import common |
| from lib import palo_job |
| from lib import kafka_config |
| |
| config = palo_config.config |
| LOG = palo_client.LOG |
| L = palo_client.L |
| broker_info = palo_config.broker_info |
| TOPIC = 'routine-load-delete-%s' % config.fe_query_port |
| |
| |
| def setup_module(): |
| """set up""" |
| global check_db, baseall_tb |
| baseall_tb = 'baseall' |
| if 'FE_DB' in os.environ.keys(): |
| check_db = os.environ['FE_DB'] |
| else: |
| check_db = 'test_query_qa' |
| |
| |
| def teardown_module(): |
| """tear down""" |
| pass |
| |
| |
| def test_delete_broker_basic(): |
| """ |
| { |
| "title": "test_delete_broker_basic", |
| "describe": "验证broker load的delete的基本功能", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # delete load, 一个空表,表中的数据仍然为空 |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load,预期表为空 |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name) |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker failed' |
| sql = 'select * from %s.%s order by k1' |
| common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb)) |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| client.clean(database_name) |
| |
| |
| def test_delete_broker_column_set(): |
| """ |
| { |
| "title": "test_delete_broker_column_set", |
| "describe": "验证broker load的delete的列设置", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12'] |
| set_list = ['k0=k7', 'k5=k4'] |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE', |
| column_name_list=column_name_list, set_list=set_list) |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load,预期表为空 |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, |
| column_name_list=column_name_list, set_list=set_list) |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE', |
| column_name_list=column_name_list, set_list=set_list) |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| client.clean(database_name) |
| |
| |
| def test_delete_broker_filter_ratio(): |
| """ |
| { |
| "title": "test_delete_broker_column_set", |
| "describe": "验证broker load的delete的数据过滤", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| partition_info=DATA.baseall_tinyint_partition_info, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12'] |
| set_list = ['k0=k7', 'k5=k4'] |
| where = 'k1 > 8' |
| partitions = ['p3'] |
| load_data_desc1 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE', |
| column_name_list=column_name_list, set_list=set_list, |
| where_clause=where, partition_list=partitions) |
| ret = client.batch_load(util.get_label(), load_data_desc1, broker=broker_info, is_wait=True, max_filter_ratio=1) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load |
| load_data_desc2 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, |
| column_name_list=column_name_list, set_list=set_list) |
| ret = client.batch_load(util.get_label(), load_data_desc2, broker=broker_info, is_wait=True) |
| assert ret, 'broker failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.batch_load(util.get_label(), load_data_desc1, broker=broker_info, is_wait=True, max_filter_ratio=1) |
| assert ret, 'broker load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1 != 9 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| |
| client.clean(database_name) |
| |
| |
| def test_merge_broker_basic(): |
| """ |
| { |
| "title": "test_merge_broker_basic", |
| "describe": "验证broker load的merge的基本功能", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # merge, 一个空表,delete on 条件命中全部数据,todo set show_hidden_columns产看表的隐藏删除数据 |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 > 0') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再merge,delete on条件未命中数据,数据全部导入 |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 = 0') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker failed' |
| sql = 'select * from %s.%s order by k1' |
| common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb)) |
| # 再导入,delete on条件命中部分数据,命中数据被删除,其他数据保持不变 |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k2 > 0') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s where k2 <= 0 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_merge_broker_set_columns(): |
| """ |
| { |
| "title": "test_merge_broker_set_columns", |
| "describe": "验证broker load的merge列设置", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12'] |
| set_list = ['k0=k7', 'k5=k4'] |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| column_name_list=column_name_list, set_list=set_list, |
| delete_on_predicates='k1 > 0') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load,预期表为空 |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| column_name_list=column_name_list, set_list=set_list, |
| delete_on_predicates='k1=0') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| column_name_list=column_name_list, set_list=set_list, |
| delete_on_predicates='k7="false"') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k6 != "false" order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_merge_broker_filter_ratio(): |
| """ |
| { |
| "title": "test_merge_broker_filter_ratio", |
| "describe": "验证broker load的merge数据导入", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| partition_info=DATA.baseall_tinyint_partition_info, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12'] |
| set_list = ['k0=k7', 'k5=k4'] |
| where = 'k1 > 8' |
| partitions = ['p3'] |
| load_data_desc1 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| column_name_list=column_name_list, set_list=set_list, |
| where_clause=where, partition_list=partitions, |
| delete_on_predicates='k1 > 0') |
| ret = client.batch_load(util.get_label(), load_data_desc1, broker=broker_info, is_wait=True, max_filter_ratio=1) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load |
| load_data_desc2 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| column_name_list=column_name_list, set_list=set_list, |
| delete_on_predicates='k1=0') |
| ret = client.batch_load(util.get_label(), load_data_desc2, broker=broker_info, is_wait=True) |
| assert ret, 'broker failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| load_data_desc1 = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| column_name_list=column_name_list, set_list=set_list, |
| where_clause='k5 is not null', partition_list=['p1', 'p2', 'p3', 'p4'], |
| delete_on_predicates='k8 > "2000-01-01"') |
| ret = client.batch_load(util.get_label(), load_data_desc1, broker=broker_info, is_wait=True, max_filter_ratio=1) |
| assert ret, 'broker load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k10 <= "20000101" order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| |
| client.clean(database_name) |
| |
| |
| def test_delete_stream_basic(): |
| """ |
| { |
| "title": "test_delete_stream_basic", |
| "describe": "验证stream load的delete的基本功能", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # delete load, 一个空表,表中的数据仍然为空 |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='DELETE') |
| assert ret, 'stream load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load,预期表为空 |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='APPEND') |
| assert ret, 'stream failed' |
| sql = 'select * from %s.%s order by k1' |
| common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb)) |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='DELETE') |
| assert ret, 'stream load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| client.clean(database_name) |
| |
| |
| def test_delete_stream_column_set(): |
| """ |
| { |
| "title": "test_delete_stream_column_set", |
| "describe": "验证stream load的delete的列设置", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4'] |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='DELETE') |
| assert ret, 'stream load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load,预期表为空 |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='APPEND') |
| assert ret, 'stream failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='DELETE') |
| assert ret, 'stream load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| client.clean(database_name) |
| |
| |
| def test_delete_stream_filter_ratio(): |
| """ |
| { |
| "title": "test_delete_stream_filter_ratio", |
| "describe": "验证stream load的delete的数据过滤", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| partition_info=DATA.baseall_tinyint_partition_info, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4'] |
| where = 'k1 > 8' |
| partitions = ['p3'] |
| |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| where_filter=where, partition_list=partitions, merge_type='DELETE', max_filter_ratio=1) |
| assert ret, 'stream load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='APPEND') |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| where_filter=where, partition_list=partitions, merge_type='DELETE', max_filter_ratio=1) |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1 != 9 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| |
| client.clean(database_name) |
| |
| |
| def test_merge_stream_basic(): |
| """ |
| { |
| "title": "test_merge_stream_basic", |
| "describe": "验证stream load的merge的基本功能", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # merge, 一个空表,delete on 条件命中全部数据,todo set show_hidden_columns产看表的隐藏删除数据 |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete='k1>0') |
| assert ret, 'stream load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再merge,delete on条件未命中数据,数据全部导入 |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete='k1=0') |
| assert ret, 'stream load failed' |
| sql = 'select * from %s.%s order by k1' |
| common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb)) |
| # 再导入,delete on条件命中部分数据,命中数据被删除,其他数据保持不变 |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete='k2>0') |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s where k2 <= 0 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_merge_stream_set_columns(): |
| """ |
| { |
| "title": "test_merge_stream_set_columns", |
| "describe": "验证broker load的merge导入", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4'] |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='MERGE', delete='k1>0') |
| assert ret, 'stream load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load,预期表为空 |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='MERGE', delete='k1=0') |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='MERGE', delete='k7="false"') |
| assert ret, 'broker load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k6 != "false" order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_merge_stream_filter_ratio(): |
| """ |
| { |
| "title": "test_merge_stream_filter_ratio", |
| "describe": "验证stream load的merge数据过滤", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| partition_info=DATA.baseall_tinyint_partition_info, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4'] |
| where = 'k1 > 8' |
| partitions = ['p3'] |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| where_filter=where, partition_list=partitions, max_filter_ratio=1, |
| merge_type='MERGE', delete='k1>0') |
| assert ret, 'stream load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='MERGE', delete='k1=0') |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| where_filter='k5 is not null', partition_list=['p1', 'p2', 'p3', 'p4'], |
| merge_type='MERGE', delete='k8 > "2000-01-01"') |
| assert ret, 'broker load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k10 <= "20000101" order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| |
| client.clean(database_name) |
| |
| |
| def test_delete_routine_basic(): |
| """ |
| { |
| "title": "test_delete_routine_basic", |
| "describe": "验证routine load的delete的基本功能", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| # enable batch delete & check |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 1.delete load, 一个空表,表中的数据仍然为空 |
| # create routine load |
| routine_load_job_name = util.get_label() |
| routine_load_property = palo_client.RoutineLoadProperty() |
| routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list) |
| routine_load_property.set_kafka_topic(TOPIC) |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| routine_load_property.set_merge_type('DELETE') |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert ret, 'routine load create failed' |
| routine_load_job = palo_job.RoutineLoadJob(client.show_routine_load(routine_load_job_name)[0]) |
| ret = (routine_load_job.get_merge_type() == 'DELETE') |
| common.assert_stop_routine_load(ret, client, routine_load_job_name, 'expect delete merge type') |
| client.wait_routine_load_state(routine_load_job_name) |
| # send kafka data & check, expect empty table |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 15) |
| ret = client.select_all(table_name) |
| common.assert_stop_routine_load(ret == (), client, routine_load_job_name, 'check error') |
| # 2.向表中导入数据,再delete load,预期表为空 |
| # stream load and check |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='APPEND') |
| common.assert_stop_routine_load(ret, client, routine_load_job_name, 'stream load failed') |
| sql = 'select * from %s.%s order by k1' |
| common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb)) |
| # send kafka data & check |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 30) |
| ret = client.select_all(table_name) |
| client.stop_routine_load(routine_load_job_name) |
| assert ret == (), 'check failed' |
| client.clean(database_name) |
| |
| |
| def test_delete_routine_column_set(): |
| """ |
| { |
| "title": "test_delete_routine_column_set", |
| "describe": "验证routine load的delete,设置column", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4'] |
| routine_load_job_name = util.get_label() |
| routine_load_property = palo_client.RoutineLoadProperty() |
| routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list) |
| routine_load_property.set_kafka_topic(TOPIC) |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| routine_load_property.set_merge_type('DELETE') |
| routine_load_property.set_column_mapping((column_name_list)) |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert ret, 'routine load create failed' |
| client.wait_routine_load_state(routine_load_job_name) |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 15) |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load,预期表为空 |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='APPEND') |
| common.assert_stop_routine_load(ret, client, routine_load_job_name, 'stream load failed') |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 30) |
| ret = client.select_all(table_name) |
| client.stop_routine_load(routine_load_job_name) |
| assert ret == (), 'check failed' |
| client.clean(database_name) |
| |
| |
| def test_delete_routine_filter_ratio(): |
| """ |
| { |
| "title": "test_delete_routine_filter_ratio", |
| "describe": "验证routine load的delete的数据过滤", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| partition_info=DATA.baseall_tinyint_partition_info, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4'] |
| routine_load_job_name = util.get_label() |
| routine_load_property = palo_client.RoutineLoadProperty() |
| routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list) |
| routine_load_property.set_kafka_topic(TOPIC) |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| routine_load_property.set_merge_type('DELETE') |
| routine_load_property.set_column_mapping(column_name_list) |
| routine_load_property.set_where_predicates('k1 > 8') |
| routine_load_property.set_partitions(['p3']) |
| routine_load_property.set_max_error_number(15) |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert ret, 'routine load create failed' |
| client.wait_routine_load_state(routine_load_job_name) |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 1) |
| ret = client.select_all(table_name) |
| common.assert_stop_routine_load(ret == (), client, routine_load_job_name, 'check failed') |
| # 向表中导入数据,再delete load |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='APPEND') |
| common.assert_stop_routine_load(ret, client, routine_load_job_name, 'stream load failed') |
| |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 2) |
| ret = client.show_routine_load(routine_load_job_name) |
| routine_load_job = palo_job.RoutineLoadJob(ret[0]) |
| error_rows = routine_load_job.get_error_rows() |
| state = routine_load_job.get_state() |
| client.stop_routine_load(routine_load_job_name) |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1 != 9 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_merge_routine_basic(): |
| """ |
| { |
| "title": "test_merge_routine_basic", |
| "describe": "验证routine load的merge基本功能", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # merge, 一个空表,delete on 条件命中全部数据,todo set show_hidden_columns产看表的隐藏删除数据 |
| routine_load_job_name = util.get_label() |
| routine_load_property = palo_client.RoutineLoadProperty() |
| routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list) |
| routine_load_property.set_kafka_topic(TOPIC) |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| routine_load_property.set_merge_type('MERGE') |
| routine_load_property.set_delete_on_predicates('k1 % 3 = 0') |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert ret, 'routine load create failed' |
| client.wait_routine_load_state(routine_load_job_name) |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 15) |
| client.stop_routine_load(routine_load_job_name) |
| ret = client.select_all(table_name) |
| assert len(ret) == 10, 'check failed' |
| # 向表中导入数据,再merge,delete on条件未命中数据,数据全部导入 |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete='k1=0') |
| assert ret, 'stream load failed' |
| sql = 'select * from %s.%s order by k1' |
| common.check2(client, sql1=sql % (database_name, table_name), sql2=sql % (check_db, baseall_tb)) |
| # 再导入,delete on条件命中部分数据,命中数据被删除,其他数据保持不变 |
| routine_load_job_name = util.get_label() |
| routine_load_property.set_delete_on_predicates('k9 > 0') |
| # 设置了offset和分区 |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert ret, 'routine load create failed' |
| client.wait_routine_load_state(routine_load_job_name) |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 15) |
| client.stop_routine_load(routine_load_job_name) |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s where k9 <= 0 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_merge_routine_set_columns(): |
| """ |
| { |
| "title": "test_merge_routine_set_columns", |
| "describe": "验证routine load的merge的列设置", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4'] |
| routine_load_job_name = util.get_label() |
| routine_load_property = palo_client.RoutineLoadProperty() |
| routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list) |
| routine_load_property.set_kafka_topic(TOPIC) |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| routine_load_property.set_merge_type('MERGE') |
| routine_load_property.set_delete_on_predicates('k1 > 0') |
| routine_load_property.set_column_mapping(column_name_list) |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert ret, 'routine load create failed' |
| client.wait_routine_load_state(routine_load_job_name) |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 15) |
| client.stop_routine_load(routine_load_job_name) |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='APPEND') |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| routine_load_job_name = util.get_label() |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| routine_load_property.set_delete_on_predicates('k11<0') |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert ret, 'routine load create failed' |
| client.wait_routine_load_state(routine_load_job_name) |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 15) |
| client.stop_routine_load(routine_load_job_name) |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k8 >= 0 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_merge_routine_filter_ratio(): |
| """ |
| { |
| "title": "test_merge_routine_filter_ratio", |
| "describe": "验证routine load的merge的数据过滤", |
| "tag": "p1,function" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| partition_info=DATA.baseall_tinyint_partition_info, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # 带有column和set,导入一张空表 |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4'] |
| where = 'k1 > 8' |
| partitions = ['p3'] |
| routine_load_job_name = util.get_label() |
| routine_load_property = palo_client.RoutineLoadProperty() |
| routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list) |
| routine_load_property.set_kafka_topic(TOPIC) |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| routine_load_property.set_merge_type('MERGE') |
| routine_load_property.set_delete_on_predicates('k1 > 0') |
| routine_load_property.set_column_mapping(column_name_list) |
| routine_load_property.set_where_predicates(where) |
| routine_load_property.set_partitions(partitions) |
| routine_load_property.set_max_error_number(15) |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert ret, 'routine load create failed' |
| client.wait_routine_load_state(routine_load_job_name) |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 1) |
| client.stop_routine_load(routine_load_job_name) |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| # 向表中导入数据,再delete load |
| ret = client.stream_load(table_name, FILE.baseall_local_file, column_name_list=column_name_list, |
| merge_type='APPEND') |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| # create routine delete load |
| routine_load_job_name = util.get_label() |
| routine_load_property.set_delete_on_predicates('k9 > "2000-01-01 00:00:00"') |
| routine_load_property.set_where_predicates('k5 is not null') |
| routine_load_property.set_partitions(['p1', 'p2', 'p3', 'p4']) |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert ret, 'routine load create failed' |
| client.wait_routine_load_state(routine_load_job_name) |
| |
| kafka_config.send_to_kafka(TOPIC, '../qe/baseall.txt') |
| client.wait_routine_load_commit(routine_load_job_name, 15) |
| client.stop_routine_load(routine_load_job_name) |
| |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, k2, k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k11 <= "20000101" order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_delete_with_delete_on(): |
| """ |
| { |
| "title": "test_delete_with_delete_on", |
| "describe": "验证当delete与delete on条件连用时报错", |
| "tag": "p1,function,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # broker load failed |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='DELETE', |
| delete_on_predicates='k1 > 0') |
| msg = 'not support DELETE ON clause when merge type is not MERGE' |
| util.assert_return(False, msg, client.batch_load, util.get_label(), load_data_desc, broker=broker_info) |
| # stream load failed |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='DELETE', delete='k1>0') |
| assert not ret, 'expect stream load failed' |
| # routine load failed |
| routine_load_job_name = util.get_label() |
| routine_load_property = palo_client.RoutineLoadProperty() |
| routine_load_property.set_kafka_broker_list(kafka_config.kafka_broker_list) |
| routine_load_property.set_kafka_topic(TOPIC) |
| partition_offset = kafka_config.get_topic_offset(TOPIC) |
| routine_load_property.set_kafka_partitions(','.join(partition_offset.keys())) |
| routine_load_property.set_kafka_offsets(','.join(partition_offset.values())) |
| routine_load_property.set_merge_type('DELETE') |
| routine_load_property.set_delete_on_predicates('k1 > 0') |
| ret = client.routine_load(table_name, routine_load_job_name, routine_load_property=routine_load_property) |
| assert not ret, 'expect create routine load failed' |
| client.clean(database_name) |
| |
| |
| def test_merge_with_delete_on(): |
| """ |
| { |
| "title": "test_merge_with_delete_on", |
| "describe": "验证merge必须带有delete on条件,否则报错;测试delete on条件,尤其是和set连用时", |
| "tag": "p1,function,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.datatype_column_no_agg_list, |
| partition_info=DATA.baseall_tinyint_partition_info, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.datatype_column_uniq_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # load fail without delete on |
| column_name_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', 'k10', 'k11', 'k12'] |
| set_list = ['k0=k7', 'k5=k4'] |
| stream_column_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4'] |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| column_name_list=column_name_list, set_list=set_list) |
| msg = 'Excepted DELETE ON clause when merge type is MERGE' |
| util.assert_return(False, msg, client.batch_load, util.get_label(), load_data_desc, broker=broker_info) |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', |
| column_name_list=stream_column_list) |
| assert not ret, 'expect stream load failed' |
| # set: k0=k6, delete: k0 = true -> fail |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, |
| merge_type='MERGE', column_name_list=column_name_list, |
| set_list=set_list, delete_on_predicates='k0=true') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert not ret, 'expect failed. unknown reference column, column=__DORIS_DELETE_SIGN__, reference=k0' |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', |
| column_name_list=stream_column_list, delete='k0=true') |
| assert not ret, 'expect failed. unknown reference column, column=__DORIS_DELETE_SIGN__, reference=k0' |
| # set: k2=k2/2+1, delete: k2 & k1 -> succ |
| set_list = ['k0=k7', 'k5=k4', 'k2=k2/2+1'] |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, |
| merge_type='MERGE', column_name_list=column_name_list, |
| set_list=set_list, delete_on_predicates='k2>0 and abs(k1) = 1') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, cast(k2/2 + 1 as int), k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1!=1 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| stream_column_list = ['k1', 'k2', 'k3', 'k4', 'k6', 'k7', 'k8', 'k9', |
| 'k10', 'k11', 'k12', 'k0=k7', 'k5=k4', 'k2=k2/2 + 1'] |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', |
| column_name_list=stream_column_list, delete='k2 <= 0 or k1 not in (1)') |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s' % (database_name, table_name) |
| sql2 = 'select case k6 when "true" then 1 when "false" then 0 end as k0, k1, cast(k2/2 + 1 as int), k3, k4, ' \ |
| 'k4, k5, k6, k10, k11, k7, k8, k9 from %s.%s where k1=1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_delete_merge_special(): |
| """ |
| { |
| "title": "test_delete_merge_special", |
| "describe": "delete & merge 特殊值", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.char_normal_column_no_agg_list, |
| distribution_info=DATA.hash_distribution_info, |
| keys_desc=DATA.unique_key, set_null=True) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert client.set_variables('show_hidden_columns', 0) |
| # load |
| load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='DELETE') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| |
| load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 is not null') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker failed' |
| ret = client.select_all(table_name) |
| assert ret == ((None, None),) |
| |
| load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 is null') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| ret1 = client.select_all(table_name) |
| ret2 = ((u'hello', u'hello'), (u'H', u'H'), (u'hello,hello', u'hello,hello'), (u'h', u'h'), |
| (u'\u4ed3\u5e93', u'\u5b89\u5168'), (u'', u'')) |
| util.check(ret1, ret2, True) |
| # 当k1文件中的值为Null时,delete on条件k1="仓库"返回null,该条数据认为是错误数据被过滤,需设置max_filter_ratio |
| load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1="仓库"') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert not ret, 'expect broker load failed' |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True, max_filter_ratio=0.2) |
| assert ret, 'broker load failed' |
| ret1 = client.select_all(table_name) |
| ret2 = ((u'H', u'H'), (u'hello,hello', u'hello,hello'), (u'h', u'h'), (u'', u''), (u'hello', u'hello')) |
| util.check(ret1, ret2, True) |
| |
| load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name) |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| load_data_desc = palo_client.LoadDataInfo(FILE.test_char_hdfs_file, table_name, merge_type='DELETE') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == (), 'check failed' |
| client.clean(database_name) |
| |
| |
| def test_enable_batch_delete(): |
| """ |
| { |
| "title": "test_enable_batch_delete", |
| "describe": "多次执行enable,结果正确。enable后,执行drop column,然后导入验证。未enable的时候delete load。agg和duplicate表agg", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| client.set_variables('show_hidden_columns', 1) |
| ret = client.schema_change_drop_column(table_name, ['__DORIS_DELETE_SIGN__'], is_wait_job=True) |
| assert ret |
| except Exception as e: |
| pass |
| # 不enable,执行delete load报错 |
| msg = 'load by MERGE or DELETE need to upgrade table to support batch delete' |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 > 0') |
| util.assert_return(False, msg, client.batch_load, util.get_label(), load_data_desc, broker=broker_info) |
| # enable,导入成功 |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret |
| print(client.show_variables('show_hidden_columns')) |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 1, 2 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| # 再次enable,enable失败 |
| msg = 'Can not enable batch delete support, already supported batch delete.' |
| util.assert_return(False, msg, client.enable_feature_batch_delete, table_name) |
| # drop column隐藏列成功 |
| assert client.set_variables('show_hidden_columns', 1) |
| msg = 'Nothing is changed. please check your alter stmt.' |
| ret = client.schema_change_drop_column(table_name, ['__DORIS_DELETE_SIGN__', '__DORIS_VERSION_COL__'], |
| is_wait_job=True) |
| assert ret |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') is None |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__') is None |
| msg = 'load by MERGE or DELETE need to upgrade table to support batch delete' |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 > 0') |
| util.assert_return(False, msg, client.batch_load, util.get_label(), load_data_desc, broker=broker_info) |
| client.clean(database_name) |
| |
| |
| def test_delete_merge_rollup_1(): |
| """ |
| { |
| "title": "test_delete_merge_rollup_1", |
| "describe": "enable,导入,创建rollup,导入", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key, |
| enable_unique_key_merge_on_write="false") |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__') |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 = 1') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, k1 = 1, 2 from %s.%s order by k1' \ |
| % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| rollup_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'k6', 'k7', 'k10', 'k11'] |
| ret = client.create_rollup_table(table_name, index_name, rollup_list, is_wait=True) |
| assert ret |
| ret = client.desc_table(table_name, is_all=True) |
| hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert len(hidden_column) == 2, 'expect base table & mv have __DORIS_DELETE_SIGN__' |
| hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__') |
| assert len(hidden_column) == 1, 'expect base table has __DORIS_VERSION_COL__' |
| sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name) |
| sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| idx = common.get_explain_rollup(client, sql1) |
| assert index_name in idx |
| assert client.set_variables('show_hidden_columns', 0) |
| sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name) |
| sql2 = 'select %s from %s.%s where k1 != 1 order by k1' % (','.join(rollup_list), check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| idx = common.get_explain_rollup(client, sql1) |
| assert index_name in idx |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 != 1') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret |
| sql1 = 'select %s from %s.%s' % (','.join(rollup_list), database_name, table_name) |
| sql2 = 'select %s from %s.%s WHERE k1 = 1' % (','.join(rollup_list), check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| idx = common.get_explain_rollup(client, sql1) |
| assert index_name in idx |
| sql1 = 'select %s from %s.%s' % (','.join(rollup_list), database_name, table_name) |
| sql2 = 'select %s from %s.%s WHERE k1 = 1' % (','.join(rollup_list), check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| assert client.set_variables('show_hidden_columns', 1) |
| sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name) |
| sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| idx = common.get_explain_rollup(client, sql1) |
| assert index_name in idx |
| client.clean(database_name) |
| |
| |
| def test_delete_merge_rollup_2(): |
| """ |
| { |
| "title": "test_delete_merge_rollup_2", |
| "describe": "建表,创建rollup,导入,enable,导入", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| # creat tb, create rollup & load |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key, |
| enable_unique_key_merge_on_write="false") |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| rollup_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'k6', 'k7', 'k10', 'k11'] |
| ret = client.create_rollup_table(table_name, index_name, rollup_list, is_wait=True) |
| assert ret |
| assert client.get_index(table_name, index_name) |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='APPEND') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret |
| sql1 = 'select * from %s.%s order by k1' |
| common.check2(client, sql1=sql1 % (database_name, table_name), sql2=sql1 % (check_db, baseall_tb)) |
| sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name) |
| sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| idx = common.get_explain_rollup(client, sql1) |
| assert index_name in idx |
| # enable |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert len(hidden_column) == 2, 'expect base table & mv have __DORIS_DELETE_SIGN__' |
| hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__') |
| assert len(hidden_column) == 1, 'expect base table has __DORIS_VERSION_COL__' |
| |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 0, 2 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name) |
| sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| idx = common.get_explain_rollup(client, sql1) |
| assert index_name in idx |
| |
| assert client.set_variables('show_hidden_columns', 0) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| idx = common.get_explain_rollup(client, sql1) |
| assert index_name in idx |
| |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 > 10') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret |
| sql1 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), database_name, table_name) |
| sql2 = 'select %s from %s.%s where k1 <= 10 order by k1' % (','.join(rollup_list), check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| idx = common.get_explain_rollup(client, sql1) |
| assert index_name in idx |
| |
| assert client.set_variables('show_hidden_columns', 1) |
| sql2 = 'select %s from %s.%s order by k1' % (','.join(rollup_list), check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| idx = common.get_explain_rollup(client, sql1) |
| assert index_name in idx |
| client.clean(database_name) |
| |
| |
| def test_add_column_delete_load(): |
| """ |
| { |
| "title": "test_add_column_delete_load", |
| "describe": "带rollup的表,加列不影响delete load", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| # 建表 |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key, |
| enable_unique_key_merge_on_write="false") |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| # 创建物化视图 |
| rollup_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'k6', 'k7', 'k10', 'k11'] |
| ret = client.create_rollup_table(table_name, index_name, rollup_list, is_wait=True) |
| assert ret |
| assert client.get_index(table_name, index_name) |
| # 导入 & 验证 |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='APPEND') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret |
| sql1 = 'select * from %s.%s order by k1' |
| common.check2(client, sql1=sql1 % (database_name, table_name), sql2=sql1 % (check_db, baseall_tb)) |
| # enable批量删除,并验证 |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert len(hidden_column) == 2, 'expect base table & mv have __DORIS_DELETE_SIGN__' |
| hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__') |
| assert len(hidden_column) == 1, 'expect base table has __DORIS_VERSION_COL__' |
| # 加列并验证 |
| v_add = [('k_add', 'INT', '', '0')] |
| ret = client.schema_change_add_column(table_name, v_add, is_wait_job=True) |
| assert ret |
| ret = client.desc_table(table_name, is_all=True) |
| hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert len(hidden_column) == 2, 'expect base table & mv have __DORIS_DELETE_SIGN__' |
| hidden_column = util.get_attr_condition_list(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__') |
| assert len(hidden_column) == 1, 'expect base table has __DORIS_VERSION_COL__' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 0, 0, 2 from %s.%s order by k1' \ |
| % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| # 导入并验证 |
| column_list = DATA.baseall_column_name_list |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, column_name_list=column_list, |
| merge_type='MERGE', delete_on_predicates='k1 > 0') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret |
| assert client.set_variables('show_hidden_columns', 0) |
| ret = client.select_all(table_name) |
| assert ret == () |
| client.clean(database_name) |
| |
| |
| def test_drop_column_delete_load(): |
| """ |
| { |
| "title": "test_drop_column_delete_load", |
| "describe": "带rollup的表,减列不影响delete load,删除隐藏列,预期失败", |
| "tag": "p1,system,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| # creat tb, create rollup & load |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_unique_key, |
| enable_unique_key_merge_on_write="false") |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name), 'can not get table: %s' % table_name |
| # load |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='APPEND') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret |
| sql1 = 'select * from %s.%s order by k1' |
| common.check2(client, sql1=sql1 % (database_name, table_name), sql2=sql1 % (check_db, baseall_tb)) |
| # enable |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_VERSION_COL__') |
| # rollup |
| rollup_list = ['k1', 'k2', 'k3', 'k4', 'k5', 'k7', 'k6', 'k10', 'k11'] |
| ret = client.create_rollup_table(table_name, index_name, rollup_list, is_wait=True) |
| assert ret |
| assert client.get_index(table_name, index_name) |
| # drop column. Can not drop key column in Unique data model table |
| ret = client.schema_change_drop_column(table_name, ['k9'], is_wait_job=True) |
| assert ret |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, 0, 2 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| # merge load |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, |
| column_name_list=DATA.baseall_column_name_list, |
| merge_type='MERGE', delete_on_predicates='k6 in ("false")') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k6 in ("false"), 3 ' \ |
| 'from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_add_drop_partition_delete_load(): |
| """ |
| { |
| "title": "test_delete_merge_duplicate_data", |
| "describe": "加减分区不影响批量删除功能", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| partition_info = palo_client.PartitionInfo("k1", |
| ["p1", "p2", "p3"], |
| ["-10", "0", "10"]) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| partition_info=partition_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| # merge, 一个空表,delete on 条件命中全部数据,todo set show_hidden_columns产看表的隐藏删除数据 |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name) |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True, max_filter_ratio=0.5) |
| assert ret |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s where k1 < 10 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.add_partition(table_name, 'p4', 20), 'add partition failed' |
| assert client.get_partition(table_name, 'p4') |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='MERGE', |
| delete_on_predicates='k1 < 10', |
| partition_list=['p1', 'p2', 'p3', 'p4']) |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True) |
| assert ret, 'broker load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s where k1 >= 10 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.drop_partition(table_name, 'p3') |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name, merge_type='delete') |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True, max_filter_ratio=0.8) |
| assert ret, 'broker load failed' |
| ret = client.select_all(table_name) |
| assert ret == () |
| client.clean(database_name) |
| |
| |
| def test_delete_and_delete_load(): |
| """ |
| { |
| "title": "test_delete_and_delete_load", |
| "describe": "delete & truncate table,关注show_hiden_column模式下,数据的正确性", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| partition_info = palo_client.PartitionInfo("k1", |
| ["p1", "p2", "p3", "p4"], |
| ["-10", "0", "10", "20"]) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| partition_info=partition_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete="k1 = 1") |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s where k1 != 1 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| assert client.set_variables('show_hidden_columns', 1) |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, k1=1, 2 from %s.%s order by k1' \ |
| % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.delete(table_name, 'k1=1', 'p3') |
| assert ret, 'delete failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, k1=1, 2 from %s.%s ' \ |
| 'where k1 != 1 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.delete(table_name, 'k1>1', 'p4') |
| assert ret, 'delete failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, k1=1, 2 from %s.%s ' \ |
| 'where k1 != 1 and k1 < 10 order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='delete') |
| assert ret |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 1, 4 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| assert client.truncate(table_name), 'truncate table failed' |
| ret = client.select_all(table_name) |
| assert ret == () |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='MERGE', delete="k1 = 1") |
| assert ret, 'stream load failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select *, k1=1, 2 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_batch_delete_insert(): |
| """ |
| { |
| "title": "test_batch_delete_insert", |
| "describe": "开启删除导入,insert select & insert value数据成功,数据正确", |
| "tag": "p1,system" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| partition_info = palo_client.PartitionInfo("k1", |
| ["p1", "p2", "p3", "p4"], |
| ["-10", "0", "10", "20"]) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| partition_info=partition_info, |
| keys_desc=DATA.baseall_unique_key, set_null=True) |
| assert ret, 'create table failed' |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.insert_select(table_name, 'select * from %s.%s' % (check_db, baseall_tb)) |
| assert ret |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 0, 2 from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| sql = 'insert into %s values(null, null, null, null, null, null, null, null, null, null, null)' % table_name |
| ret = client.execute(sql) |
| assert ret == (), 'insert values failed' |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='merge', delete='k1 is null') |
| assert ret, 'stream load failed' |
| sql2 = 'select null, null, null, null, null, null, null, null, null, null, null, 0, 2 ' \ |
| 'union select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 0, 3 from %s.%s' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2, forced=True) |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='delete') |
| assert ret, 'stream load failed' |
| sql2 = 'select null, null, null, null, null, null, null, null, null, null, null, 0, 2 ' \ |
| 'union select k1, k2, k3, k4, k5, k6, k10, k11, k7, k8, k9, 1, 4 from %s.%s' % (check_db, baseall_tb) |
| common.check2(client, sql1=sql1, sql2=sql2, forced=True) |
| client.clean(database_name) |
| |
| |
| def test_batch_delete_some_times(): |
| """ |
| { |
| "title": "test_batch_delete_some_times", |
| "describe": "多次执行导入删除,验证最后结果正确", |
| "tag": "p1,system,stability" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| partition_info = palo_client.PartitionInfo("k1", |
| ["p1", "p2", "p3", "p4"], |
| ["-10", "0", "10", "20"]) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| partition_info=partition_info, |
| keys_desc=DATA.baseall_unique_key) |
| assert ret, 'create table failed' |
| load_data_desc = palo_client.LoadDataInfo(FILE.baseall_hdfs_file, table_name) |
| ret = client.batch_load(util.get_label(), load_data_desc, broker=broker_info, is_wait=True, max_filter_ratio=0.5) |
| assert ret |
| sq11 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s order by k1' % (check_db, baseall_tb) |
| common.check2(client, sql1=sq11, sql2=sql2) |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| for i in range(20): |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='merge', delete="k1 > %s" % i) |
| assert ret, 'stream load failed' |
| time.sleep(3) |
| common.check2(client, sql1=sq11, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_delete_merge_limitation(): |
| """ |
| { |
| "title": "test_delete_merge_limitation", |
| "describe": "验证delete load的限制", |
| "tag": "p1,function,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.baseall_column_no_agg_list, |
| distribution_info=DATA.baseall_distribution_info, |
| keys_desc=DATA.baseall_duplicate_key) |
| assert ret, 'create table failed' |
| |
| ret = client.stream_load(table_name, FILE.baseall_local_file, merge_type='DELETE') |
| assert not ret |
| msg = 'Batch delete only supported in unique tables.' |
| util.assert_return(False, msg, client.enable_feature_batch_delete, table_name) |
| client.clean(database_name) |
| |
| |
| def test_delete_merge_duplicate_data(): |
| """ |
| { |
| "title": "test_delete_merge_duplicate_data", |
| "describe": "验证delete load的文件中,当key相同的时候以最后出现的key为准,delete on为value,验证结果正确", |
| "tag": "p1,function,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| LOG.info(L('', database_name=database_name, table_name=table_name, index_name=index_name)) |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, DATA.tinyint_column_no_agg_list, |
| distribution_info=DATA.hash_distribution_info, |
| keys_desc='UNIQUE KEY(k1)') |
| assert ret, 'create table failed' |
| assert client.show_tables(table_name) |
| try: |
| ret = client.enable_feature_batch_delete(table_name) |
| assert ret, 'enable batch delete feature failed' |
| except Exception as e: |
| pass |
| assert client.set_variables('show_hidden_columns', 1) |
| ret = client.desc_table(table_name, is_all=True) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfoAll.Field, '__DORIS_DELETE_SIGN__') |
| ret = client.stream_load(table_name, FILE.test_tinyint_file, max_filter_ratio=0.1, |
| merge_type='merge', delete='v1=1') |
| assert ret, 'stream load failed' |
| ret = client.stream_load(table_name, FILE.test_tinyint_file, max_filter_ratio=0.1, |
| merge_type='merge', delete='v1=3') |
| assert ret, 'stream load failed' |
| assert client.set_variables('show_hidden_columns', 0) |
| ret = client.select_all(table_name) |
| assert ret == () |
| client.clean(database_name) |
| |
| |
| if __name__ == '__main__': |
| setup_module() |
| |
| |