| #!/bin/env python |
| # -*- coding: utf-8 -*- |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| ############################################################################ |
| # |
| # @file test_array_alter.py |
| # @date 2022-08-15 11:09:53 |
| # @brief This file is a test file for array type. |
| # |
| ############################################################################# |
| """ |
| test_array_alter.py |
| """ |
| import sys |
| import os |
| import time |
| import pytest |
| file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) |
| sys.path.append(file_dir) |
| file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) |
| sys.path.append(file_dir) |
| |
| from lib import palo_config |
| from lib import palo_client |
| from lib import palo_job |
| from lib import util |
| from lib import common |
| from data import schema as SCHEMA |
| from data import load_file as FILE |
| |
| config = palo_config.config |
| broker_info = palo_config.broker_info |
| |
| check_db = 'array_check_db' |
| check_tb = 'array_tb' |
| |
| |
| def setup_module(): |
| """setup""" |
| client = common.get_client() |
| ret = client.show_variables('enable_vectorized_engine') |
| if len(ret) == 1 and ret[0][1] == 'false': |
| raise pytest.skip('skip if enable_vectorized_engine is false') |
| |
| ret = client.admin_show_config('enable_array_type') |
| assert len(ret) == 1, 'get enable_array_type config error' |
| value = palo_job.AdminShowConfig(ret[0]).get_value() |
| if value != 'true': |
| client.set_frontend_config('enable_array_type', 'true') |
| if len(client.show_databases(check_db)) == 0: |
| init_check() |
| assert client.verify(FILE.expe_array_table_file, check_tb, database_name=check_db) |
| |
| |
| def init_check(): |
| """init check db & tb""" |
| client = common.create_workspace(check_db) |
| ret = client.create_table(check_tb, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(check_tb, FILE.test_array_table_local_file, column_separator='|') |
| assert ret, 'load chech table failed' |
| |
| |
| def teardown_module(): |
| """teardown""" |
| pass |
| |
| |
| def test_add_array_column(): |
| """ |
| { |
| "title": "test_add_array_column", |
| "describe": "array类型,增加array类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|') |
| assert ret, 'stream load failed' |
| column_name_list = util.get_attr(SCHEMA.array_table_list, 0) |
| |
| column_list = [('add_arr', 'array<int>', None, "[]")] |
| ret = client.schema_change_add_column(table_name, column_list, is_wait_job=True, set_null=True) |
| assert ret, 'add column failed' |
| |
| column_list = [('add_arr', 'array<int>', None, None)] |
| msg = 'Can not set null default value to non nullable column: add_arr' |
| ret = util.assert_return(False, msg, client.schema_change_add_column, table_name, column_list, set_null=False) |
| |
| column_list = [('add_arr', 'array<int>', 'key', None)] |
| msg = 'Array can only be used in the non-key column of the duplicate table at present.' |
| ret = util.assert_return(False, msg, client.schema_change_add_column, table_name, column_list, set_null=True) |
| |
| column_list = [('add_arr1', 'array<int>', None, None)] |
| ret = client.schema_change_add_column(table_name, column_list, is_wait_job=True, set_null=True) |
| assert ret, 'add column failed' |
| |
| ret = client.desc_table(table_name) |
| assert 'ARRAY<INT(11)>' == util.get_attr_condition_value(ret, palo_job.DescInfo.Field, 'add_arr1', |
| palo_job.DescInfo.Type) |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select *, [], null from %s.%s order by k1' % (check_db, check_tb) |
| common.check2(client, sql1, sql2=sql2) |
| |
| column_name_list.append('add_arr=a4') |
| ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|', |
| column_name_list=column_name_list) |
| assert ret, 'stream load failed' |
| time.sleep(5) |
| sql1 = 'select count(*) from %s.%s' % (database_name, table_name) |
| sql2 = 'select count(*) * 2 from %s.%s' % (check_db, check_tb) |
| common.check2(client, sql1, sql2=sql2) |
| |
| client.clean(database_name) |
| |
| |
| def test_drop_array_column(): |
| """ |
| { |
| "title": "test_drop_array_column", |
| "describe": "array类型,删除array类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|') |
| assert ret, 'stream load failed' |
| column_name_list = util.get_attr(SCHEMA.array_table_list, 0) |
| ret = client.schema_change_drop_column(table_name, ['a1', 'a2', 'a7', 'a11', 'a13'], is_wait_job=True) |
| assert ret, 'drop column failed' |
| # check |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select k1, a3, a4, a5, a6, a8, a9, a10, a12, a14 from %s.%s order by k1' % (check_db, check_tb) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.desc_table(table_name) |
| assert util.get_attr_condition_value(ret, palo_job.DescInfo.Field, 'a1', palo_job.DescInfo.Type) is None |
| assert util.get_attr_condition_value(ret, palo_job.DescInfo.Field, 'a2', palo_job.DescInfo.Type) is None |
| assert util.get_attr_condition_value(ret, palo_job.DescInfo.Field, 'a7', palo_job.DescInfo.Type) is None |
| assert util.get_attr_condition_value(ret, palo_job.DescInfo.Field, 'a11', palo_job.DescInfo.Type) is None |
| assert util.get_attr_condition_value(ret, palo_job.DescInfo.Field, 'a13', palo_job.DescInfo.Type) is None |
| |
| ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|', |
| column_name_list=column_name_list) |
| assert ret, 'stream load failed' |
| time.sleep(5) |
| sql1 = 'select count(*) from %s.%s' % (database_name, table_name) |
| sql2 = 'select count(*) * 2 from %s.%s' % (check_db, check_tb) |
| common.check2(client, sql1, sql2=sql2) |
| |
| client.clean(database_name) |
| |
| |
| def test_modify_array_column_not_support(): |
| """ |
| { |
| "title": "test_modify_array_column_not_support", |
| "describe": "不支持修改array类型", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, SCHEMA.array_boolean_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| msg = 'Can not change ARRAY to STRING' |
| util.assert_return(False, msg, client.schema_change_modify_column, table_name, 'k2', 'string') |
| msg = 'Can not change ARRAY to ARRAY' |
| util.assert_return(False, msg, client.schema_change_modify_column, table_name, 'k2', 'array<string>') |
| client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_boolean(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_boolean", |
| "describe": "array类型,修改array<boolean>类型列为其他array子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['tinyint', 'smallint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_boolean_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_boolean_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_boolean_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_boolean_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_boolean_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_tinyint(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_tinyint", |
| "describe": "array类型,修改array<tinyint>类型列为其他array子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'smallint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_tinyint_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_tinyint_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_tinyint_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_tinyint_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_tinyint_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_smallint(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_smallint", |
| "describe": "array类型,修改array<smallint>类型列为array其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| column_type_list = ['int', 'bigint'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_smallint_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_smallint_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_smallint_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_smallint_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_smallint_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_int(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_int", |
| "describe": "array类型,修改array<int>类型列为array其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_int_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_int_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_int_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_int_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_int_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_bigint(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_bigint", |
| "describe": "array类型,修改array<bigint>类型列为其他array子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'largeint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_bigint_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_bigint_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_bigint_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_bigint_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_bigint_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_largeint(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_largeint", |
| "describe": "array类型,修改array<largeint>类型列为其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_largeint_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_largeint_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_largeint_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_largeint_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_largeint_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_decimal(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_decimal", |
| "describe": "array类型,修改array<decimal>类型列为array其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'largeint', |
| 'double', 'float', 'date', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_decimal_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_decimal_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_decimal_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_decimal_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_decimal_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_double(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_double", |
| "describe": "array类型,修改array<double>类型列为array其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'float', 'date', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_double_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_double_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_double_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_double_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_double_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_float(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_float", |
| "describe": "array类型,修改array<double>类型列为其他array子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'date', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_float_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_float_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_float_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_float_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_float_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_date(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_date", |
| "describe": "array类型,修改array<date>类型列为array其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'float', |
| 'datetime', 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_date_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_date_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_date_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_date_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_date_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_datetime(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_datetime", |
| "describe": "array类型,修改array<datetime>类型列为array其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'char(10)', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_datetime_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_datetime_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_datetime_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_datetime_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_datetime_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_char(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_char", |
| "describe": "array类型,修改array<char>类型列为array其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'datetime', 'varchar(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_char_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_char_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_char_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_char_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_char_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_varchar(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_varchar", |
| "describe": "array类型,修改array<varchar>类型列为array其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'datetime', 'char(10)', 'string'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_varchar_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_varchar_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_varchar_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_varchar_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_varchar_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| @pytest.mark.skip() |
| def test_modify_array_column_type_array_string(): |
| """ |
| { |
| "title": "test_modify_array_column_type_array_string", |
| "describe": "array类型,修改array<string>类型列为array其他子类型列", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_type_list = ['boolean', 'tinyint', 'smallint', 'int', 'bigint', 'largeint', |
| 'deical(9, 3)', 'double', 'float', 'date', |
| 'datetime', 'char(10)', 'varchar(10)'] |
| |
| ret = client.create_table(table_name, SCHEMA.array_string_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_string_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| c_type = 'array<%s>' |
| k = 'k2' |
| for sub_type in column_type_list: |
| tb = table_name[-50:] + '_' + sub_type.split('(')[0] |
| ret = client.create_table(tb, SCHEMA.array_string_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(tb, FILE.test_array_string_local_file, max_filter_ratio=0.01) |
| assert ret, 'stream load failed' |
| ret = client.schema_change_modify_column(tb, k, c_type % sub_type, is_wait_job=True) |
| assert ret, 'modify table failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, tb) |
| sql2 = 'select k1, cast(%s as %s) from %s.%s order by k1' % (k, sub_type, database_name, table_name) |
| common.check2(client, sql1, sql2=sql2) |
| ret = client.stream_load(tb, FILE.test_array_string_local_file, max_filter_ratio=0.01) |
| assert ret, 'after modify stream load failed' |
| sql1 = 'select count(*) from %s.%s' % (database_name, tb) |
| sql2 = 'select count(*) * 2 from %s.%s' % (database_name, table_name) |
| print(client.execute(sql1)) |
| # common.check2(client, sql1, sql2=sql2) |
| # client.clean(database_name) |
| |
| |
| def test_modify_array_column_order(): |
| """ |
| { |
| "title": "test_modify_array_column_order", |
| "describe": "array类型,修改表的array类型列的顺序", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|') |
| assert ret, 'stream load failed' |
| column_name_list = ['k1', 'a14', 'a13', 'a12', 'a11', 'a10', 'a9', |
| 'a8', 'a7', 'a6', 'a5', 'a4', 'a3', 'a2', 'a1'] |
| ret = client.schema_change_order_column(table_name, column_name_list, is_wait_job=True) |
| assert ret, 'modify column order failed' |
| ret = client.desc_table(table_name) |
| actual_column = util.get_attr(ret, palo_job.DescInfo.Field) |
| assert actual_column == column_name_list, 'column order error, expect %s, actual %s' \ |
| % (column_name_list, actual_column) |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select %s from %s.%s order by k1' % (', '.join(column_name_list), check_db, check_tb) |
| common.check2(client, sql1, sql2=sql2) |
| |
| column_name_list = util.get_attr(SCHEMA.array_table_list, 0) |
| ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|', |
| column_name_list=column_name_list) |
| assert ret, 'stream load failed' |
| time.sleep(5) |
| sql1 = 'select count(*) from %s.%s' % (database_name, table_name) |
| sql2 = 'select count(*) * 2 from %s.%s' % (check_db, check_tb) |
| common.check2(client, sql1, sql2=sql2) |
| client.clean(database_name) |
| |
| |
| def test_array_bloom_filter(): |
| """ |
| { |
| "title": "test_array_bloom_filter", |
| "describe": "array类型列作为bloom filter, 报错", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| bf_column = ['k1', 'a2'] |
| msg = 'ARRAY is not supported in bloom filter index. invalid column: a2' |
| util.assert_return(False, msg, client.create_table, table_name, SCHEMA.array_table_list, |
| keys_desc=SCHEMA.duplicate_key, bloom_filter_column_list=bf_column) |
| ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|') |
| assert ret, 'stream load failed' |
| util.assert_return(False, msg, client.schema_change, table_name, bloom_filter_column_list=bf_column) |
| client.clean(database_name) |
| |
| |
| def test_array_mv(): |
| """ |
| { |
| "title": "test_array_mv", |
| "describe": "array类型不支持创建物化视图, k1, a2, group by, order by, sum/min/max", |
| "tag": "function,p0,fuzz" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| column_list = [('k0', 'int')] + SCHEMA.array_table_list |
| column_name_list = util.get_attr(SCHEMA.array_table_list, 0) + ['k0=k1+1'] |
| ret = client.create_table(table_name, column_list, keys_desc='DUPLICATE KEY(k0, k1)') |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|', |
| column_name_list=column_name_list) |
| assert ret, 'stream load failed' |
| # array, without order by/group by |
| index_name = index_name + '1' |
| sql = 'select k1, a2 from %s order by k1' % table_name |
| msg = ' The ARRAY column[`mv_a2` array<tinyint(4)> NOT NULL] not support to create materialized view' |
| util.assert_return(False, msg, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| index_name = index_name + '2' |
| sql = 'select k1, a3 from %s' % table_name |
| msg = 'The ARRAY column[`mv_a3` array<smallint(6)> NOT NULL] not support to create materialized view' |
| util.assert_return(False, msg, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| |
| index_name = 'err_idx' |
| # order by array |
| sql = 'select k1, a4 from %s order by k1, a4' % table_name |
| msg = "must use with specific function, and don't support filter or group by." |
| util.assert_return(False, msg, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| # group by array |
| sql = 'select k1, a5 from %s group by k1, a5' % table_name |
| util.assert_return(False, msg, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| # count array |
| sql = 'select k1, count(a6) from %s group by k1' % table_name |
| util.assert_return(False, msg, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| # sum array |
| msg_s = 'sum requires a numeric parameter: sum(`a7`)' |
| sql = 'select k1, sum(a7) from %s group by k1' % table_name |
| util.assert_return(False, msg_s, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| # max array |
| sql = 'select k1, max(a8) from %s group by k1' % table_name |
| util.assert_return(False, msg, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| # min array |
| sql = 'select k1, min(a9) from %s group by k1' % table_name |
| util.assert_return(False, msg, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| # bitmap_union |
| msg_b = 'No matching function with signature: to_bitmap_with_check(array<date>).' |
| sql = 'select k1, bitmap_union(to_bitmap(a10)) from %s group by k1' % table_name |
| util.assert_return(False, msg_b, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| # hll_union |
| msg_h = 'No matching function with signature: hll_hash(array<datetime>).' |
| sql = 'select k1, hll_union_agg(hll_hash(a11)) from %s group by k1' % table_name |
| util.assert_return(False, msg_h, client.create_materialized_view, table_name, index_name, sql, is_wait=True) |
| client.clean(database_name) |
| |
| |
| def test_array_export(): |
| """ |
| { |
| "title": "test_array_export", |
| "describe": "array类型,导出成功", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|') |
| assert ret, 'stream load failed' |
| property_dict = {'column_separator': '|'} |
| ret = client.export(table_name, FILE.export_to_hdfs_path, broker_info=broker_info, property_dict=property_dict) |
| assert ret, 'export failed' |
| client.wait_export() |
| ret = client.show_export(state="FINISHED") |
| assert len(ret) == 1 |
| client.clean(database_name) |
| |
| |
| def test_array_outfile_csv(): |
| """ |
| { |
| "title": "test_array_outfile_csv", |
| "describe": "array类型,查询结果导出为csv", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_file, table_name, column_terminator='|') |
| ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, max_filter_ratio=0.01, is_wait=True) |
| assert ret, 'broker load failed' |
| |
| sql = 'select * from %s' % table_name |
| csv_output_path = palo_config.gen_remote_file_path('export/%s/%s/%s') % (database_name, util.get_label(), |
| util.get_label()) |
| ret = client.select_into(sql, csv_output_path, broker_info, format_as='csv') |
| print(ret) |
| assert ret, 'select into csv failed' |
| |
| csv_check_table = table_name + '_csv' |
| csv_load_file = str(palo_job.SelectIntoInfo(ret[0]).get_url() + '*') |
| ret = client.create_table(csv_check_table, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| load_data_list = palo_client.LoadDataInfo(csv_load_file, csv_check_table) |
| ret = client.batch_load(util.get_label(), load_data_list, broker=broker_info, is_wait=True) |
| assert ret, 'csv outfile load failed' |
| |
| for k in (1, 2, 3, 4, 5, 6, 7, 10, 11, 12, 13, 14): |
| sql1 = 'select a%s from %s order by k1' % (k, table_name) |
| sql2 = 'select a%s from %s order by k1' % (k, csv_check_table) |
| common.check2(client, sql1=sql1, sql2=sql2, forced=True) |
| # array double & float 精度与原数据不一致,使用array_sum用作校验 |
| for k in (8, 9): |
| sql1 = 'select size(a%s), array_sum(a%s) from %s order by k1' % (k, k, table_name) |
| sql2 = 'select size(a%s), array_sum(a%s) from %s order by k1' % (k, k, csv_check_table) |
| common.check2(client, sql1=sql1, sql2=sql2, forced=True) |
| client.clean(database_name) |
| |
| |
| def test_array_outfile_format(): |
| """ |
| { |
| "title": "test_array_outfile_format", |
| "describe": "array类型,导出格式测试,array不支持导出为parquet格式", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_file, table_name, column_terminator='|') |
| ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, max_filter_ratio=0.01, is_wait=True) |
| assert ret, 'broker load failed' |
| |
| sql = 'select * from %s' % table_name |
| csv_output_path = palo_config.gen_remote_file_path('export/%s/%s/%s') % (database_name, util.get_label(), |
| util.get_label()) |
| ret = client.select_into(sql, csv_output_path, broker_info, format_as='csv_with_names') |
| print(ret) |
| assert ret, 'select into csv failed' |
| |
| csv_output_path = palo_config.gen_remote_file_path('export/%s/%s/%s') % (database_name, util.get_label(), |
| util.get_label()) |
| ret = client.select_into(sql, csv_output_path, broker_info, format_as='csv_with_names_and_types') |
| print(ret) |
| assert ret, 'select into csv failed' |
| |
| # not support outfile parquet |
| parquet_output_path = palo_config.gen_remote_file_path('export/%s/%s/%s') % (database_name, util.get_label(), |
| util.get_label()) |
| msg = 'currently parquet do not support column type: ARRAY' |
| util.assert_return(False, msg, client.select_into, sql, parquet_output_path, broker_info, format_as='parquet') |
| client.clean(database_name) |
| |
| |
| def test_array_delete(): |
| """ |
| { |
| "title": "test_array_delete", |
| "describe": "array类型删除,删除后数据正确", |
| "tag": "function,p0" |
| } |
| """ |
| database_name, table_name, index_name = util.gen_name_list() |
| client = common.create_workspace(database_name) |
| ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key) |
| assert ret, 'create table failed' |
| load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_file, table_name, column_terminator='|') |
| ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, max_filter_ratio=0.01, is_wait=True) |
| assert ret, 'broker load failed' |
| ret = client.delete(table_name, 'k1 > 0') |
| assert ret, 'delete failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s where k1<= 0 order by k1' % (check_db, check_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.delete(table_name, 'a1 is null') |
| assert ret, 'delete failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s where k1<= 0 order by k1' % (check_db, check_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| ret = client.delete(table_name, 'a1 is not null and k1 > -10') |
| assert ret, 'delete failed' |
| sql1 = 'select * from %s.%s order by k1' % (database_name, table_name) |
| sql2 = 'select * from %s.%s where k1<= -10 order by k1' % (check_db, check_tb) |
| common.check2(client, sql1=sql1, sql2=sql2) |
| client.truncate(table_name) |
| ret = client.select_all(table_name) |
| assert ret == (), 'expect empty table' |
| client.clean(database_name) |
| |
| |
| if __name__ == '__main__': |
| setup_module() |
| # todo modify |