blob: bc2a3d5dde121cbcdfbaaa0f8445ed95a1dcaa58 [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
############################################################################
#
# @file test_array_ddl.py
# @date 2022-08-15 11:09:53
# @brief This file is a test file for array type.
#
# BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, FLOAT, DOUBLE, DECIMAL, DATE, DATETIME, CHAR, VARCHAR, STRING
# 4种导入方式 insert/stream/broker/routine load
# 4种导入格式 csv,json,parquet,orc
# json导入参数
# max_filter_ratio
# set column
#############################################################################
"""
test_array_load.py
array routine load case in test_routine_load_property.py test_array_load
"""
import sys
import os
import time
import pytest
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.append(file_dir)
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(file_dir)
from lib import palo_config
from lib import palo_client
from lib import palo_job
from lib import util
from lib import common
from data import schema as SCHEMA
from data import load_file as FILE
from lib import palo_types
config = palo_config.config
broker_info = palo_client.BrokerInfo(config.broker_name, config.broker_property)
# BOOLEAN, TINYINT, SMALLINT, INT, BIGINT, LARGEINT, FLOAT, DOUBLE, DECIMAL, DATE, DATETIME, CHAR, VARCHAR, STRING
# 4种导入方式 insert/stream/broker/routine load
# 4种导入格式 csv,json,parquet,orc
# json导入参数
# max_filter_ratio
# set column
def setup_module():
"""enable array"""
client = common.get_client()
ret = client.show_variables('enable_vectorized_engine')
if len(ret) == 1 and ret[0][1] == 'false':
raise pytest.skip('skip if enable_vectorized_engine is false')
ret = client.admin_show_config('enable_array_type')
assert len(ret) == 1, 'get enable_array_type config error'
value = palo_job.AdminShowConfig(ret[0]).get_value()
if value != 'true':
client.set_frontend_config('enable_array_type', 'true')
print(len(client.get_alive_backend_list()))
def teardown_module():
"""tearDown"""
client = common.get_client()
print(len(client.get_alive_backend_list()))
def load_check(db, tb, schema, local_file, hdfs_file, expect_file, **kwargs):
"""
1. create db
2. create tb & stream load, skip if no local file
3. brokerte tb & load, skip if no broker file
4. check load result with expect file
5. drop db
**kwargs pass to stream load & broker load
"""
client = common.create_workspace(db)
stream_tb = tb + '_stream'
broker_tb = tb + '_broker'
k2_type = util.get_attr_condition_value(schema, 0, 'k2', 1)
be_checked_table = list()
# stream load
if local_file is not None:
ret = client.create_table(stream_tb, schema, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
ret = client.stream_load(stream_tb, local_file, max_filter_ratio=0.01, **kwargs)
assert ret, 'stream load failed'
be_checked_table.append(stream_tb)
time.sleep(5)
# broker load
if hdfs_file is not None:
ret = client.create_table(broker_tb, schema, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
load_data_info = palo_client.LoadDataInfo(hdfs_file, broker_tb, **kwargs)
ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info,
max_filter_ratio=0.01, is_wait=True)
assert ret, 'broker load failed'
be_checked_table.append(broker_tb)
# check
if local_file and hdfs_file:
sql1 = 'select * from %s order by k1' % stream_tb
sql2 = 'select * from %s order by k1' % broker_tb
common.check2(client, sql1, sql2=sql2)
for tb in be_checked_table:
if 'DOUBLE' in k2_type.upper() or 'FLOAT' in k2_type.upper():
common.check_by_file(expect_file, table_name=tb, client=client, k2=palo_types.ARRAY_DOUBLE)
elif 'DECIMAL' in k2_type.upper():
common.check_by_file(expect_file, table_name=tb, client=client, k2=palo_types.ARRAY_DECIMAL)
else:
common.check_by_file(expect_file, table_name=tb, client=client)
client.clean(db)
def test_array_boolean():
"""
{
"title": "test_array_boolean",
"describe": "array类型中,子类型为boolean,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_boolean_list
local_file = FILE.test_array_boolean_local_file
hdfs_file = FILE.test_array_boolean_remote_file
expect_file = FILE.expe_array_boolean_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_boolean_json():
"""
{
"title": "test_array_boolean_json",
"describe": "array类型中,子类型为boolean,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_boolean_list
local_file = FILE.test_array_boolean_local_json
expect_file = FILE.expe_array_boolean_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_boolean_insert():
"""
{
"title": "test_array_boolean_insert",
"describe": "array类型中,子类型为boolean,insert导入,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_boolean_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert array data
sql_list = list()
sql_list.append("insert into %s values (1000, ['1', '0'])" % table_name)
sql_list.append("insert into %s values (1001, ['true', 'false'])" % table_name)
sql_list.append("insert into %s values (1002, [2])" % table_name)
sql_list.append("insert into %s values (1003, [NULL])" % table_name)
sql_list.append("insert into %s values (1004, NULL)" % table_name)
sql_list.append("insert into %s values (1005, ['Abc'])" % table_name)
sql_list.append("insert into %s values (1006, [1.234])" % table_name)
sql_list.append("insert into %s values (1007, '1,2,3')" % table_name)
sql_list.append("insert into %s values (1008, '[1,2,3]')" % table_name)
for i in sql_list:
client.execute(i)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[1, 0]'), (1001, u'[1, 0]'), (1002, u'[1]'),
(1003, u'[NULL]'), (1004, None), (1005, u'[NULL]'),
(1006, u'[1]'), (1007, None), (1008, u'[1, NULL, NULL]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_tinyint():
"""
{
"title": "test_array_tinyint",
"describe": "array类型中,子类型为tinyint,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_tinyint_list
local_file = FILE.test_array_tinyint_local_file
hdfs_file = FILE.test_array_tinyint_remote_file
expect_file = FILE.expe_array_tinyint_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_tinyint_json():
"""
{
"title": "test_array_tinyint_json",
"describe": "array类型中,子类型为tinyint,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_tinyint_list
local_file = FILE.test_array_tinyint_local_json
expect_file = FILE.expe_array_tinyint_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_tinyint_insert():
"""
{
"title": "test_array_tinyint_insert",
"describe": "array类型中,子类型为tinyint,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_tinyint_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [])" % table_name)
sql_list.append("insert into %s values (1001, [NULL])" % table_name)
sql_list.append("insert into %s values (1002, [1, 0, -0])" % table_name)
sql_list.append("insert into %s values (1003, [1.23, 2.34])" % table_name)
sql_list.append("insert into %s values (1004, NULL)" % table_name)
err_sql_list.append("insert into %s values (1005, [-129])" % table_name)
err_sql_list.append("insert into %s values (1006, [128])" % table_name)
sql_list.append("insert into %s values (1007, ['123', '23'])" % table_name)
err_sql_list.append("insert into %s values (1008, ['ABC', '1'])" % table_name)
sql_list.append("insert into %s values (1009, '1,2,3')" % table_name)
sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1, 0, 0]'), (1003, u'[1, 2]'),
(1004, None),
(1007, u'[123, 23]'), (1009, None), (1010, u'[1, 2, 3]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_smallint():
"""
{
"title": "test_array_smallint",
"describe": "array类型中,子类型为smallint,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_smallint_list
local_file = FILE.test_array_smallint_local_file
hdfs_file = FILE.test_array_smallint_remote_file
expect_file = FILE.expe_array_smallint_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_smallint_json():
"""
{
"title": "test_array_smallint_json",
"describe": "array类型中,子类型为smallint,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_smallint_list
local_file = FILE.test_array_smallint_local_json
expect_file = FILE.expe_array_smallint_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_smallint_insert():
"""
{
"title": "test_array_smallint_insert",
"describe": "array类型中,子类型为smallint,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_smallint_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [])" % table_name)
sql_list.append("insert into %s values (1001, [NULL])" % table_name)
sql_list.append("insert into %s values (1002, [1, 0, -0])" % table_name)
sql_list.append("insert into %s values (1003, [1.23, 2.34])" % table_name)
sql_list.append("insert into %s values (1004, NULL)" % table_name)
err_sql_list.append("insert into %s values (1005, [-32769])" % table_name)
err_sql_list.append("insert into %s values (1006, [32768])" % table_name)
sql_list.append("insert into %s values (1007, ['123', '23'])" % table_name)
err_sql_list.append("insert into %s values (1008, ['ABC', '1'])" % table_name)
sql_list.append("insert into %s values (1009, '1,2,3')" % table_name)
sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1, 0, 0]'),
(1003, u'[1, 2]'), (1004, None), (1007, u'[123, 23]'),
(1009, None), (1010, u'[1, 2, 3]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_int():
"""
{
"title": "test_array_int",
"describe": "array类型中,子类型为int,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_int_list
local_file = FILE.test_array_int_local_file
hdfs_file = FILE.test_array_int_remote_file
expect_file = FILE.expe_array_int_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_int_json():
"""
{
"title": "test_array_int_json",
"describe": "array类型中,子类型为intt,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_int_list
local_file = FILE.test_array_int_local_json
expect_file = FILE.expe_array_int_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_int_insert():
"""
{
"title": "test_array_int_insert",
"describe": "array类型中,子类型为int,insert导入查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_int_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [])" % table_name)
sql_list.append("insert into %s values (1001, [NULL])" % table_name)
sql_list.append("insert into %s values (1002, [1, 0, -0])" % table_name)
sql_list.append("insert into %s values (1003, [1.23, 2.34])" % table_name)
sql_list.append("insert into %s values (1004, NULL)" % table_name)
err_sql_list.append("insert into %s values (1005, [-2147483649])" % table_name)
err_sql_list.append("insert into %s values (1006, [2147483648])" % table_name)
sql_list.append("insert into %s values (1007, ['123', '23'])" % table_name)
err_sql_list.append("insert into %s values (1008, ['ABC', '1'])" % table_name)
sql_list.append("insert into %s values (1009, '1,2,3')" % table_name)
sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name)
err_sql_list.append("insert into %s values (1005, [-9223372036854775809])" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1, 0, 0]'),
(1003, u'[1, 2]'), (1004, None), (1007, u'[123, 23]'),
(1009, None), (1010, u'[1, 2, 3]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_bigint():
"""
{
"title": "test_array_bigint",
"describe": "array类型中,子类型为bigint,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_bigint_list
local_file = FILE.test_array_bigint_local_file
hdfs_file = FILE.test_array_bigint_remote_file
expect_file = FILE.expe_array_bigint_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_bigint_json():
"""
{
"title": "test_array_bigint_json",
"describe": "array类型中,子类型为bigint,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_bigint_list
local_file = FILE.test_array_bigint_local_json
expect_file = FILE.expe_array_bigint_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_bigint_insert():
"""
{
"title": "test_array_bigint_insert",
"describe": "array类型中,子类型为bigint,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_bigint_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [])" % table_name)
sql_list.append("insert into %s values (1001, [NULL])" % table_name)
sql_list.append("insert into %s values (1002, ['1002', '1002'])" % table_name)
err_sql_list.append("insert into %s values (1003, [1, 'abc'])" % table_name)
sql_list.append("insert into %s values (1004, [1, 3.1415, -0])" % table_name)
sql_list.append("insert into %s values (1005, NULL)" % table_name)
sql_list.append("insert into %s values (1006, '1,2,3')" % table_name)
err_sql_list.append("insert into %s values (1007, [-9223372036854775809])" % table_name)
err_sql_list.append("insert into %s values (1008, [9223372036854775808])" % table_name)
sql_list.append("insert into %s values (1009, [-9223372036854775808, 9223372036854775807])" % table_name)
sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1002, 1002]'),
(1004, u'[1, 3, 0]'), (1005, None), (1006, None),
(1009, u'[-9223372036854775808, 9223372036854775807]'), (1010, u'[1, 2, 3]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_largeint():
"""
{
"title": "test_array_largeint_json",
"describe": "array类型中,子类型为largeint,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_largeint_list
local_file = FILE.test_array_largeint_local_file
hdfs_file = FILE.test_array_largeint_remote_file
expect_file = FILE.expe_array_largeint_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_largeint_json():
"""
{
"title": "test_array_largeint_json",
"describe": "array类型中,子类型为boolean,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_largeint_list
local_file = FILE.test_array_largeint_local_json
expect_file = FILE.expe_array_largeint_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_largeint_json_num():
"""
{
"title": "test_array_largeint_json",
"describe": "array类型中,子类型为largeint,stream导入json,largeint为数字非字符串,导入成功,结果为Null",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_largeint_list
local_file = FILE.test_array_largeint_local_json_num
expect_file = FILE.expe_array_largeint_json_num_file_null
load_check(database_name, table_name, column_list, local_file, None, expect_file, format='json')
def test_array_largeint_json_num_as_string():
"""
{
"title": "test_array_largeint_json",
"describe": "array类型中,子类型为largeint,stream导入json,设置num_as_string,largeint为数字非字符串,导入成功,结果为Null",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_largeint_list
local_file = FILE.test_array_largeint_local_json_num
expect_file = FILE.expe_array_largeint_json_num_file_real
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', num_as_string='true')
def test_array_largeint_insert():
"""
{
"title": "test_array_largeint_insert",
"describe": "array类型中,子类型为largeint,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_largeint_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [])" % table_name)
sql_list.append("insert into %s values (1001, [NULL])" % table_name)
sql_list.append("insert into %s values (1002, [1, 0, -0])" % table_name)
sql_list.append("insert into %s values (1003, [1.23, 2.34])" % table_name)
sql_list.append("insert into %s values (1004, NULL)" % table_name)
err_sql_list.append("insert into %s values (1005, [-170141183460469231731687303715884105729])" % table_name)
err_sql_list.append("insert into %s values (1006, [170141183460469231731687303715884105728])" % table_name)
sql_list.append("insert into %s values (1007, ['123', '23'])" % table_name)
err_sql_list.append("insert into %s values (1008, ['ABC', '1'])" % table_name)
sql_list.append("insert into %s values (1009, '1,2,3')" % table_name)
sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[1, 0, 0]'),
(1003, u'[1, 2]'), (1004, None), (1007, u'[123, 23]'),
(1009, None), (1010, u'[1, 2, 3]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_decimal():
"""
{
"title": "test_array_decimal",
"describe": "array类型中,子类型为decimal,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_decimal_list
local_file = FILE.test_array_decimal_local_file
hdfs_file = FILE.test_array_decimal_remote_file
expect_file = FILE.expe_array_decimal_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_decimal_json():
"""
{
"title": "test_array_decimal_json",
"describe": "array类型中,子类型为decimal,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_decimal_list
local_file = FILE.test_array_decimal_local_json
expect_file = FILE.expe_array_decimal_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_decimal_insert():
"""
{
"title": "test_array_decimal_insert",
"describe": "array类型中,子类型为decimal,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_decimal_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [123, 3, 4])" % table_name)
sql_list.append("insert into %s values (1001, [])" % table_name)
sql_list.append("insert into %s values (1002, [NULL])" % table_name)
sql_list.append("insert into %s values (1003, ['-1.234', '2.3456'])" % table_name)
sql_list.append("insert into %s values (1004, [1.0000000001])" % table_name)
sql_list.append("insert into %s values (1005, [10000000000000000000.1111111222222222])" % table_name)
sql_list.append("insert into %s values (1006, '1,3,4')" % table_name)
err_sql_list.append("insert into %s values (1007, ['ABCD'])" % table_name)
sql_list.append("insert into %s values (1008, NULL)" % table_name)
sql_list.append("insert into %s values (1009, [999999999999999999.9999999])" % table_name)
sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[123, 3, 4]'), (1001, u'[]'),
(1002, u'[NULL]'), (1003, u'[-1.234, 2.3456]'),
(1004, u'[1]'), (1005, u'[100000000000000000.001111111]'), # 1005 decimal is wrong, overflow
(1006, None), (1008, None),
(1009, u'[999999999999999999.9999999]'), (1010, u'[1, 2, 3]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_float():
"""
{
"title": "test_array_float",
"describe": "array类型中,子类型为float,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_float_list
local_file = FILE.test_array_float_local_file
hdfs_file = FILE.test_array_float_remote_file
expect_file = FILE.expe_array_float_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_float_json():
"""
{
"title": "test_array_float_json",
"describe": "array类型中,子类型为float,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_float_list
local_file = FILE.test_array_float_local_json
expect_file = FILE.expe_array_float_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_float_insert():
"""
{
"title": "test_array_float_insert",
"describe": "array类型中,子类型为float,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_float_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [123, 3, 4])" % table_name)
sql_list.append("insert into %s values (1001, [])" % table_name)
sql_list.append("insert into %s values (1002, [NULL])" % table_name)
sql_list.append("insert into %s values (1003, ['-1.234', '2.3456'])" % table_name)
sql_list.append("insert into %s values (1004, [1.0000000001])" % table_name)
sql_list.append("insert into %s values (1005, [10000000000000000000.1111111222222222])" % table_name)
sql_list.append("insert into %s values (1006, '1,3,4')" % table_name)
err_sql_list.append("insert into %s values (1007, ['ABCD'])" % table_name)
sql_list.append("insert into %s values (1008, NULL)" % table_name)
sql_list.append("insert into %s values (1009, [999999999999999999.9999999])" % table_name)
sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[123, 3, 4]'), (1001, u'[]'), (1002, u'[NULL]'),
(1003, u'[-1.234, 2.3456]'), (1004, u'[1]'), (1005, u'[1e+19]'),
(1006, None), (1008, None), (1009, u'[1e+18]'),
(1010, u'[1, 2, 3]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_double():
"""
{
"title": "test_array_double",
"describe": "array类型中,子类型为double,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_double_list
local_file = FILE.test_array_double_local_file
hdfs_file = FILE.test_array_double_remote_file
expect_file = FILE.expe_array_double_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_double_json():
"""
{
"title": "test_array_double_json",
"describe": "array类型中,子类型为double,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_double_list
local_file = FILE.test_array_double_local_json
expect_file = FILE.expe_array_double_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_double_insert():
"""
{
"title": "test_array_double_insert",
"describe": "array类型中,子类型为double,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_double_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [123, 3, 4])" % table_name)
sql_list.append("insert into %s values (1001, [])" % table_name)
sql_list.append("insert into %s values (1002, [NULL])" % table_name)
sql_list.append("insert into %s values (1003, ['-1.234', '2.3456'])" % table_name)
sql_list.append("insert into %s values (1004, [1.0000000001])" % table_name)
sql_list.append("insert into %s values (1005, [10000000000000000000.1111111222222222])" % table_name)
sql_list.append("insert into %s values (1006, '1,3,4')" % table_name)
err_sql_list.append("insert into %s values (1007, ['ABCD'])" % table_name)
sql_list.append("insert into %s values (1008, NULL)" % table_name)
sql_list.append("insert into %s values (1009, [999999999999999999.9999999])" % table_name)
sql_list.append("insert into %s values (1010, '[1,2,3]')" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[123, 3, 4]'), (1001, u'[]'), (1002, u'[NULL]'),
(1003, u'[-1.234, 2.3456]'), (1004, u'[1.0000000001]'),
(1005, u'[1e+19]'), (1006, None), (1008, None),
(1009, u'[1e+18]'), (1010, u'[1, 2, 3]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_date():
"""
{
"title": "test_array_date",
"describe": "array类型中,子类型为date,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_date_list
local_file = FILE.test_array_date_local_file
hdfs_file = FILE.test_array_date_remote_file
expect_file = FILE.expe_array_date_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_date_json():
"""
{
"title": "test_array_date_json",
"describe": "array类型中,子类型为date,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_date_list
local_file = FILE.test_array_date_local_json
expect_file = FILE.expe_array_date_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_date_insert():
"""
{
"title": "test_array_date_insert",
"describe": "array类型中,子类型为date,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_date_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, ['0001-01-01', '9999-12-31'])" % table_name)
sql_list.append("insert into %s values (1001, ['10000-01-01'])" % table_name)
sql_list.append("insert into %s values (1002, ['20200202'])" % table_name)
sql_list.append("insert into %s values (1003, ['2020-01-01 12:23:12'])" % table_name)
sql_list.append("insert into %s values (1004, ['2020/12/12'])" % table_name)
err_sql_list.append("insert into %s values (1005, [2020-01-01])" % table_name)
sql_list.append("insert into %s values (1006, [])" % table_name)
sql_list.append("insert into %s values (1007, [NULL])" % table_name)
sql_list.append("insert into %s values (1008, NULL)" % table_name)
sql_list.append("insert into %s values (1009, ['ABC', 'DEF'])" % table_name)
err_sql_list.append("insert into %s values (1010, 123)" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[0001-01-01, 9999-12-31]'), (1001, u'[NULL]'),
(1002, u'[2020-02-02]'), (1003, u'[2020-01-01]'),
(1004, u'[2020-12-12]'), (1006, u'[]'),
(1007, u'[NULL]'), (1008, None), (1009, u'[NULL, NULL]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_datetime():
"""
{
"title": "test_array_datetime",
"describe": "array类型中,子类型为datetime,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_datetime_list
local_file = FILE.test_array_datetime_local_file
hdfs_file = FILE.test_array_datetime_remote_file
expect_file = FILE.expe_array_datetime_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_datetime_json():
"""
{
"title": "test_array_datetime_json",
"describe": "array类型中,子类型为datetime,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_datetime_list
local_file = FILE.test_array_datetime_local_json
expect_file = FILE.expe_array_datetime_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_datetime_insert():
"""
{
"title": "test_array_datetime_insert",
"describe": "array类型中,子类型为datetime,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_datetime_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [])" % table_name)
sql_list.append("insert into %s values (1001, [NULL])" % table_name)
sql_list.append("insert into %s values (1002, ['0001-01-01 00:00:00', '9999-12-31 23:59:58'])" % table_name)
sql_list.append("insert into %s values (1003, ['2020-01-01'])" % table_name)
sql_list.append("insert into %s values (1004, ['2020-13-01'])" % table_name)
sql_list.append("insert into %s values (1005, ['0000-00-00 00:00:00'])" % table_name)
sql_list.append("insert into %s values (1006, ['2020/01/01 00-00-00'])" % table_name)
sql_list.append("insert into %s values (1007, ['20200101120000', 20200101120000])" % table_name)
sql_list.append("insert into %s values (1008, NULL)" % table_name)
sql_list.append("insert into %s values (1009, ['ABCD', 'AM'])" % table_name)
sql_list.append("insert into %s values (1010, '1,2,3')" % table_name)
sql_list.append("insert into %s values (1011, ['1975-06-18 01:47:17'])" % table_name)
sql_list.append("insert into %s values (1012, '[0001-01-01 00:00:00, 9999-12-31 23:59:58]')" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u'[]'), (1001, u'[NULL]'), (1002, u'[0001-01-01 00:00:00, 9999-12-31 23:59:58]'),
(1003, u'[2020-01-01 00:00:00]'), (1004, u'[NULL]'), (1005, u'[NULL]'),
(1006, u'[2020-01-01 00:00:00]'), (1007, u'[2020-01-01 12:00:00, 2020-01-01 12:00:00]'),
(1008, None), (1009, u'[NULL, NULL]'), (1010, None), (1011, u'[1975-06-18 01:47:17]'),
(1012, u'[0001-01-01 00:00:00, 9999-12-31 23:59:58]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_char():
"""
{
"title": "test_array_char",
"describe": "array类型中,子类型为char,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_char_list
local_file = FILE.test_array_char_local_file
hdfs_file = FILE.test_array_char_remote_file
expect_file = FILE.expe_array_char_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
@pytest.mark.skip()
def test_array_char_json():
"""
{
"title": "test_array_char_json",
"describe": "array类型中,子类型为char,stream导入json,导入成功,查询结果正确.",
"tag": "function,p0"
}
"""
# todo: 存在json转义字符问题
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_char_list
local_file = FILE.test_array_char_local_json
expect_file = FILE.expe_array_char_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
@pytest.mark.skip()
def test_array_char_insert():
"""
{
"title": "test_array_char_insert",
"describe": "array类型中,子类型为char,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
# todo: strict为true未过滤,不生效
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_char_list
client = common.create_workspace(database_name)
client.set_variables('enable_insert_strict', 'true')
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [''])" % table_name)
sql_list.append("insert into %s values (1001, [NULL])" % table_name)
sql_list.append("insert into %s values (1002, NULL)" % table_name)
sql_list.append("insert into %s values (1003, ['true', 'false', null])" % table_name)
err_sql_list.append("insert into %s values (1004, [1, 2, 3])" % table_name)
sql_list.append("insert into %s values (1005, ['中文验证', '测试', '1kdads', '空 格'])" % table_name)
sql_list.append("insert into %s values (1006, ['!@#$^&*()', '#$<>:\\'{},'])" % table_name)
sql_list.append("insert into %s values (1009, ['hello', 'word'])" % table_name)
sql_list.append("insert into %s values (1010, '[]')" % table_name)
err_sql_list.append("insert into %s values (1004, [1, 2, 3])" % table_name)
err_sql_list.append("insert into %s values (1007, ['too long:%s', 'ttds'])" % (table_name, 'changdu' * 50))
sql_list.append("insert into %s values (1008, 'ddate')" % table_name) # ??
for i in sql_list:
client.execute(i)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u"['']"), (1001, u'[NULL]'), (1002, None), (1003, u"['true', 'false', NULL]"),
(1005, u"['\u4e2d\u6587\u9a8c\u8bc1', '\u6d4b\u8bd5', '1kdads', '\u7a7a \u683c']"),
(1006, u"['!@#$^&*()', '#$<>:'{},']"), (1008, None), (1009, u"['hello', 'word']"),
(1010, u'[]'))
util.check(ret, expect, True)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
# strict关闭,之前不支持的sql导入成功
client.set_variables('enable_insert_strict', 'false')
err_sql_list = list()
err_sql_list.append("insert into %s values (1007, ['too long:%s', 'ttds'])" % (table_name, 'changdu' * 50))
err_sql_list.append("insert into %s values (1008, 'ddate')" % table_name)
for sql in err_sql_list:
util.assert_return(True, ' ', client.execute, sql)
sql = 'select * from %s where k1 in (1007, 1008)' % table_name
ret = client.execute(sql)
expect = ((1008, None),)
util.check(ret, expect)
client.clean(database_name)
def test_array_varchar():
"""
{
"title": "test_array_varchar",
"describe": "array类型中,子类型为varchar,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_varchar_list
local_file = FILE.test_array_varchar_local_file
hdfs_file = FILE.test_array_varchar_remote_file
expect_file = FILE.expe_array_varchar_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_varchar_json():
"""
{
"title": "test_array_varchar_json",
"describe": "array类型中,子类型为varchar,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_varchar_list
local_file = FILE.test_array_varchar_local_json
expect_file = FILE.expe_array_varchar_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_varchar_insert():
"""
{
"title": "test_array_varchar_insert",
"describe": "array类型中,子类型为varchar,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_varchar_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [''])" % table_name)
sql_list.append("insert into %s values (1001, [NULL])" % table_name)
sql_list.append("insert into %s values (1002, NULL)" % table_name)
sql_list.append("insert into %s values (1003, ['true', 'false', null])" % table_name)
sql_list.append("insert into %s values (1004, [1, 2, 3])" % table_name)
sql_list.append("insert into %s values (1005, ['中文验证', '测试', '1kdads', '空 格'])" % table_name)
sql_list.append("insert into %s values (1006, ['!@#$^&*()', '#$<>:\\'{},'])" % table_name)
err_sql_list.append("insert into %s values (1007, ['too long:%s']" % (table_name, 'changdu' * 50))
sql_list.append("insert into %s values (1008, 'ddate')" % table_name)
sql_list.append("insert into %s values (1009, ['hello', 'word'])" % table_name)
sql_list.append("insert into %s values (1010, '[]')" % table_name)
err_sql_list.append("insert into %s values (1011, {'test'})" % table_name)
err_sql_list.append("insert into %s values (1012, [{},{}])" % table_name)
err_sql_list.append("insert into %s values (1013, [[],[]])" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u"['']"), (1001, u'[NULL]'), (1002, None), (1003, u"['true', 'false', NULL]"),
(1004, u"['1', '2', '3']"),
(1005, u"['\u4e2d\u6587\u9a8c\u8bc1', '\u6d4b\u8bd5', '1kdads', '\u7a7a \u683c']"),
(1006, u"['!@#$^&*()', '#$<>:'{},']"), (1008, None),
(1009, u"['hello', 'word']"), (1010, u'[]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_string():
"""
{
"title": "test_array_string",
"describe": "array类型中,子类型为string,stream、broker导入csv,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_string_list
local_file = FILE.test_array_varchar_local_file
hdfs_file = FILE.test_array_varchar_remote_file
expect_file = FILE.expe_array_string_file
load_check(database_name, table_name, column_list, local_file, hdfs_file, expect_file)
def test_array_string_json():
"""
{
"title": "test_array_string_json",
"describe": "array类型中,子类型为string,stream导入json,导入成功,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_string_list
local_file = FILE.test_array_varchar_local_json
expect_file = FILE.expe_array_string_file
load_check(database_name, table_name, column_list, local_file, None, expect_file,
format='json', strip_outer_array=True)
def test_array_string_insert():
"""
{
"title": "test_array_string_insert",
"describe": "array类型中,子类型为string,insert导入,查询结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
column_list = SCHEMA.array_string_list
client = common.create_workspace(database_name)
ret = client.create_table(table_name, column_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
# insert
sql_list = list()
err_sql_list = list()
sql_list.append("insert into %s values (1000, [''])" % table_name)
sql_list.append("insert into %s values (1001, [NULL])" % table_name)
sql_list.append("insert into %s values (1002, NULL)" % table_name)
sql_list.append("insert into %s values (1003, ['true', 'false', null])" % table_name)
sql_list.append("insert into %s values (1004, [1, 2, 3])" % table_name)
sql_list.append("insert into %s values (1005, ['中文验证', '测试', '1kdads', '空 格'])" % table_name)
sql_list.append("insert into %s values (1006, ['!@#$^&*()', '#$<>:\\'{},'])" % table_name)
sql_list.append("insert into %s values (1007, ['long:%s'])" % (table_name, 'changdu' * 50))
sql_list.append("insert into %s values (1008, 'ddate')" % table_name)
sql_list.append("insert into %s values (1009, ['hello', 'word'])" % table_name)
sql_list.append("insert into %s values (1010, '[]')" % table_name)
err_sql_list.append("insert into %s values (1011, {'test'})" % table_name)
err_sql_list.append("insert into %s values (1012, [{},{}])" % table_name)
err_sql_list.append("insert into %s values (1013, [[],[]])" % table_name)
for i in sql_list:
client.execute(i)
for sql in err_sql_list:
util.assert_return(False, ' ', client.execute, sql)
sql = 'select * from %s order by k1' % table_name
ret = client.execute(sql)
expect = ((1000, u"['']"), (1001, u'[NULL]'), (1002, None),
(1003, u"['true', 'false', NULL]"), (1004, u"['1', '2', '3']"),
(1005, u"['\u4e2d\u6587\u9a8c\u8bc1', '\u6d4b\u8bd5', '1kdads', '\u7a7a \u683c']"),
(1006, u"['!@#$^&*()', '#$<>:'{},']"),
(1007, u"['long:%s']" % ('changdu' * 50)), (1008, None),
(1009, u"['hello', 'word']"), (1010, u'[]'))
util.check(ret, expect)
client.clean(database_name)
def test_array_parquet():
"""
{
"title": "test_array_parquet",
"describe": "parquet文件,列类型一个为string,一个是hive生成的array复杂类型列,分别导入,不支持hive生成的格式,但be不core",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
client = common.create_workspace(database_name)
ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_parquet_string,
table_name, format_as='parquet')
ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True)
assert ret, 'broker load failed'
assert client.verify(FILE.expe_array_table_file, table_name)
column_name_list = util.get_attr(SCHEMA.array_table_list, 0)
load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_parquet_hive,
table_name, format_as='parquet', column_name_list=column_name_list)
ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True)
assert not ret, 'expect broker load failed'
client.clean(database_name)
def test_array_orc():
"""
{
"title": "test_array_orc",
"describe": "orc文件,列类型一个为string,一个是hive生成的array类型列,分别导入,支持hive生成的格式,todo check",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
client = common.create_workspace(database_name)
ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_orc_string, table_name, format_as='orc')
ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True)
assert ret, 'broker load faled'
assert client.verify(FILE.expe_array_table_file, table_name)
load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_parquet_hive,
table_name, format_as='orc')
ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True)
assert not ret, 'expect broker load failed'
assert client.truncate(table_name), 'truncate table failed'
load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_orc_hive,
table_name, format_as='orc')
ret = client.batch_load(util.get_label(), load_data_info, broker=broker_info, is_wait=True)
assert ret, 'expect broker load failed'
# assert client.verify(FILE.expe_array_table_file, table_name)
# client.clean(database_name)
def test_array_json():
"""
{
"title": "test_array_orc",
"describe": "json文件导入,成功,结果正确",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
client = common.create_workspace(database_name)
ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key,
set_null=True)
assert ret, 'create table failed'
ret = client.stream_load(table_name, FILE.test_array_table_local_json, format='json',
strip_outer_array=True, num_as_string='true')
assert ret, 'stream load failed'
common.check_by_file(FILE.expe_array_table_file, table_name, client=client,
a7=palo_types.ARRAY_DECIMAL, a8=palo_types.ARRAY_FLOAT,
a9=palo_types.ARRAY_DOUBLE)
client.clean(database_name)
def test_array_strict():
"""
{
"title": "test_array_strict",
"describe": "验证strict模式,分别为not Null列,strict为true,strict为false,构造错误数据",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
client = common.create_workspace(database_name)
schema = SCHEMA.array_int_list
tb_n = table_name + '_n' # not null
tb_s = table_name + '_s' # strict true
tb_ns = table_name + '_ns' # strict false
ret = client.create_table(tb_s, schema, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
ret = client.create_table(tb_ns, schema, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
ret = client.create_table(tb_n, schema, keys_desc=SCHEMA.duplicate_key, set_null=False)
assert ret, 'create table failed'
ret = client.stream_load(tb_n, FILE.test_array_mix_file_local, max_filter_ratio=1, column_separator='|')
assert ret, 'stream load failed'
ret = client.stream_load(tb_s, FILE.test_array_mix_file_local, strict_mode='true',
max_filter_ratio=1, column_separator='|')
assert ret, 'stream load failed'
ret = client.stream_load(tb_ns, FILE.test_array_mix_file_local, strict_mode='false',
max_filter_ratio=1, column_separator='|')
assert ret, 'stream load failed'
tb_i = table_name + '_i' # insert test
ret = client.create_table(tb_i, schema, keys_desc=SCHEMA.duplicate_key, set_null=True)
assert ret, 'create table failed'
insert_sql = 'insert into %s values(1, "abc")' % tb_i
# todo
# client.set_variables('enable_insert_strict', 'true')
# util.assert_return(False, ' ', client.execute, insert_sql)
client.set_variables('enable_insert_strict', 'false')
util.assert_return(True, None, client.execute, insert_sql)
# client.clean(database_name)
def test_array_stream_param():
"""
{
"title": "test_array_stream_param",
"describe": "导入时,设置的一些参数,如set,where,columns,partition",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
client = common.create_workspace(database_name)
check_tb = 'check_tb'
ret = client.create_table(check_tb, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key)
assert ret, 'create table failed'
ret = client.stream_load(check_tb, FILE.test_array_table_local_file, column_separator='|')
assert ret, 'stream load failed'
ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key,
partition_info=SCHEMA.baseall_tinyint_partition_info)
assert ret, 'create table failed'
# column, where, partition
columns = ['k1', 'b1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8',
'a9', 'a10', 'a11', 'a12', 'a13', 'a14', 'a1=[]']
ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|',
column_name_list=columns, where='size(a2) > 270', partitions='p1, p2')
assert not ret, 'expect stream load failed'
columns = ['k1', 'b1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8',
'a9', 'a10', 'a11', 'a12', 'a13', 'a14', 'a1=[]']
ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|',
column_name_list=columns, where='size(a2) > 270', partitions='p1, p2',
max_filter_ratio=0.8)
assert ret, 'stream load failed'
sql1 = 'select * from %s order by k1' % table_name
sql2 = 'select k1, [], a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, a14 from %s ' \
'where size(a2) > 270 and k1 < 0 order by k1' % check_tb
common.check2(client, sql1=sql1, sql2=sql2)
# array function
client.truncate(table_name)
columns = ['k1', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 'a9',
'a10', 'a11', 'a12', 'a13', 'b14',
'a14=array_sort(array_union(cast(a12 as array<string>), cast(a13 as array<string>)))']
ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|',
column_name_list=columns, where='size(a2) > 270', partitions='p1, p2',
max_filter_ratio=0.8)
assert ret, 'stream load failed'
sql1 = 'select * from %s order by k1' % table_name
sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, ' \
'array_sort(array_union(a12, a13)) from %s where size(a2) > 270 and k1 < 0 order by k1' % check_tb
common.check2(client, sql1=sql1, sql2=sql2)
# array function, todo core
client.truncate(table_name)
columns = ['k1', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7', 'a8', 'a9', 'a10',
'a11', 'a12', 'a13', 'b14', 'a14=array_remove(cast(a5 as array<bigint>), 1)']
ret = client.stream_load(table_name, FILE.test_array_table_local_file, column_separator='|',
column_name_list=columns, where='size(a2) > 270', partitions='p1, p2',
max_filter_ratio=0.8)
assert ret, 'stream load failed'
sql1 = 'select * from %s order by k1' % table_name
sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, array_sort(array_union(a12, a13)) ' \
'from %s where size(a2) > 270 and k1 < 0 order by k1' % check_tb
common.check2(client, sql1=sql1, sql2=sql2)
# client.clean(database_name)
def test_array_broker_param():
"""
{
"title": "test_array_broker_param",
"describe": "导入时,设置的一些参数,如set,where,columns,partition",
"tag": "function,p0"
}
"""
database_name, table_name, index_name = util.gen_name_list()
client = common.create_workspace(database_name)
check_tb = 'check_tb'
ret = client.create_table(check_tb, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key)
assert ret, 'create table failed'
ret = client.stream_load(check_tb, FILE.test_array_table_local_file, column_separator='|')
assert ret, 'stream load failed'
ret = client.create_table(table_name, SCHEMA.array_table_list, keys_desc=SCHEMA.duplicate_key,
partition_info=SCHEMA.baseall_tinyint_partition_info)
assert ret, 'create table failed'
# broker load & check
column_name_list = ['k1', 'a1', 'a2', 'a3', 'a4', 'a5', 'a6', 'a7',
'a8', 'a9', 'a10', 'a11', 'a12', 'a13', 'b14']
set_list = ['a14=array_union(cast(b14 as array<string>), cast(a13 as array<string>))']
load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_file, table_name,
partition_list=['p1', 'p2'],
column_name_list=column_name_list, set_list=set_list,
column_terminator='|', where_clause='size(a2) > 270')
ret = client.batch_load(util.get_label(), load_data_info, max_filter_ratio=0.8, broker=broker_info, is_wait=True)
assert ret
sql1 = 'select * from %s order by k1' % table_name
sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, array_union(a14, a13) from %s ' \
'where size(a2) > 270 and k1 < 0 order by k1' % check_tb
common.check2(client, sql1=sql1, sql2=sql2)
client.truncate(table_name)
set_list = ['a14=[]']
load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_file, table_name,
partition_list=['p1', 'p2'],
column_name_list=column_name_list, set_list=set_list,
column_terminator='|', where_clause='size(a2) > 270')
ret = client.batch_load(util.get_label(), load_data_info, max_filter_ratio=0.8, broker=broker_info, is_wait=True)
assert ret
sql1 = 'select * from %s order by k1' % table_name
sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, [] from %s ' \
'where size(a2) > 270 and k1 < 0 order by k1' % check_tb
common.check2(client, sql1=sql1, sql2=sql2)
client.truncate(table_name)
set_list = ['a14=[]']
load_data_info = palo_client.LoadDataInfo(FILE.test_array_table_remote_file, table_name,
partition_list=['p1', 'p2'],
column_name_list=column_name_list, set_list=set_list,
column_terminator='|', where_clause='size(array_distinct(a2)) = 1')
ret = client.batch_load(util.get_label(), load_data_info, max_filter_ratio=0.8, broker=broker_info, is_wait=True)
assert ret
sql1 = 'select * from %s order by k1' % table_name
sql2 = 'select k1, a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, a13, [] from %s ' \
'where size(array_distinct(a2)) = 1 and k1 < 0 order by k1' % check_tb
common.check2(client, sql1=sql1, sql2=sql2)
client.clean(database_name)
if __name__ == '__main__':
setup_module()
# test_array_decimal() # todo: 10000000000000000000.1111111222222222 vs 100000000000000000.001111111
# test_array_char() # todo 逗号数据不正确问题,json转义字符问题
# test_array_stream_param() # todo stream load core
test_array_strict() # todo