blob: bbf13b632c5d2971cfb0bedb20732369f76629a5 [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
file: stream_test_simple.py
测试 stream load hll & null
"""
import os
import sys
import time
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.append(file_dir)
file_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.append(file_dir)
from data import schema as DATA
from lib import palo_config
from lib import palo_client
from lib import common
config = palo_config.config
LOG = palo_client.LOG
L = palo_client.L
host = config.fe_host
query_port = config.fe_query_port
http_port = config.fe_http_port
user = config.fe_user
password = config.fe_password
class TestHll(object):
"""test hll type"""
database_name = 'stream_load_hll_test_db'
def setup_class(self):
"""setup class"""
client = palo_client.get_client(host, query_port, user=user, password=password)
client.clean(self.database_name)
ret = client.create_database(self.database_name)
print('drop db')
assert ret
self.client = client
def setUp(self):
"""setUp"""
pass
def test_hll_tinyint(self):
"""
{
"title": "test_stream_simple1.test_hll_tinyint",
"describe": "test hll tinyint",
"tag": "function,p1"
}
"""
"""test hll tinyint"""
table_name = 'tinyint_hll_test'
insert_table_name = 'tinyint_hll_insert'
self.client.use(self.database_name)
column_list = DATA.hll_tinyint_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type)
assert ret
assert self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
column_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1', 'dirty2', 'dirty3', 'dirty4']
data_file = "%s/data/STREAM_LOAD/test_hash_tinyint.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_list)
assert ret
self.check(table_name)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
time.sleep(3)
self.check(insert_table_name)
def test_hll_smallint(self):
"""
{
"title": "test_stream_simple1.test_hll_smallint",
"describe": "test hll smallint",
"tag": "function,p1"
}
"""
"""test hll smallint"""
table_name = 'smallint_hll_test'
insert_table_name ='smallint_hll_insert'
self.client.use(self.database_name)
column_list = DATA.hll_smallint_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type)
assert ret
assert self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
column_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1', 'dirty2', 'dirty3', 'dirty4']
data_file = "%s/data/STREAM_LOAD/test_hash_smallint.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_list)
assert ret
self.check(table_name)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
self.check(insert_table_name)
def test_hll_int(self):
"""
{
"title": "test_stream_simple1.test_hll_int",
"describe": "test hll int",
"tag": "function,p1"
}
"""
"""test hll int"""
table_name = 'int_hll_test'
insert_table_name = 'int_hll_insert'
self.client.use(self.database_name)
column_list = DATA.hll_int_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type)
assert ret
assert self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
column_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1', 'dirty2', 'dirty3', 'dirty4']
data_file = "%s/data/STREAM_LOAD/test_hash_int.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_list)
assert ret
self.check(table_name)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
self.check(insert_table_name)
def test_hll_bigint(self):
"""
{
"title": "test_stream_simple1.test_hll_bigint",
"describe": "test hll bigint",
"tag": "function,p1"
}
"""
"""test hll bigint"""
table_name = 'bigint_hll_test'
insert_table_name = 'bigint_hll_insert'
self.client.use(self.database_name)
column_list = DATA.hll_bigint_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type)
assert self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
assert ret
column_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1', 'dirty2', 'dirty3', 'dirty4']
data_file = "%s/data/STREAM_LOAD/test_hash_bigint.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_list)
assert ret
self.check(table_name)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
self.check(insert_table_name)
def test_hll_largeint(self):
"""
{
"title": "test_stream_simple1.test_hll_largeint",
"describe": "test hll largeint",
"tag": "function,p1"
}
"""
"""test hll largeint"""
table_name = 'largeint_hll_test'
insert_table_name = 'largeint_hll_insert'
self.client.use(self.database_name)
column_list = DATA.hll_largeint_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type)
assert ret
assert self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
column_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1', 'dirty2', 'dirty3', 'dirty4']
data_file = "%s/data/STREAM_LOAD/test_hash_largeint.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_list)
assert ret
self.check(table_name)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
self.check(insert_table_name)
def test_hll_char(self):
"""
{
"title": "test_stream_simple1.test_hll_char",
"describe": "test hll char",
"tag": "function,p1"
}
"""
"""test hll char"""
table_name = 'char_hll_test'
insert_table_name = 'char_hll_insert'
self.client.use(self.database_name)
column_list = DATA.hll_char_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type)
assert ret
assert self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
column_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1']
data_file = "%s/data/STREAM_LOAD/test_hash_char_normal.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_list)
assert ret
self.check(table_name)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
self.check(insert_table_name)
def test_hll_varchar(self):
"""
{
"title": "test_stream_simple1.test_hll_varchar",
"describe": "test hll varchar",
"tag": "function,p1"
}
"""
"""test hll varchar"""
table_name = 'varchar_hll_test'
insert_table_name = 'varchar_hll_insert'
self.client.use(self.database_name)
column_list = DATA.hll_varchar_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list,
distribution_info=distribution_type)
assert ret
assert self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
column_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1']
data_file = "%s/data/STREAM_LOAD/test_hash_varchar_least.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_list)
assert ret
self.check(table_name)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
self.check(insert_table_name)
def test_hll_date(self):
"""
{
"title": "test_stream_simple1.test_hll_date",
"describe": "test hll date",
"tag": "function,p1"
}
"""
"""test hll date"""
table_name = 'date_hll_test'
insert_table_name = 'date_hll_insert'
self.client.use(self.database_name)
column_list = DATA.hll_date_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type)
assert ret
ret = self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
assert ret
column_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1', 'dirty2', 'dirty3']
data_file = "%s/data/STREAM_LOAD/test_hash_date.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_list)
assert ret
self.check(table_name)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
self.check(insert_table_name)
def test_hll_datetime(self):
"""
{
"title": "test_stream_simple1.test_hll_datetime",
"describe": "test hll datetime",
"tag": "function,p1"
}
"""
"""test hll datetime"""
table_name = 'datetime_hll_test'
self.client.use(self.database_name)
column_list = DATA.hll_datetime_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type)
assert ret
column_name_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1', 'dirty2', 'dirty3']
data_file = "%s/data/STREAM_LOAD/test_hash_datetime.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_name_list)
assert ret
self.check(table_name)
insert_table_name = 'datetime_hll_insert'
assert self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
self.check(insert_table_name)
def test_hll_decimal(self):
"""
{
"title": "test_stream_simple1.test_hll_decimal",
"describe": "test hll decimal",
"tag": "function,p1"
}
"""
"""test hll decimal"""
table_name = 'decimal_hll_test'
self.client.use(self.database_name)
column_list = DATA.hll_decimal_column_list
distribution_type = DATA.hash_distribution_info
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type)
assert ret
column_name_list = ['k1', 'v1=hll_hash(k1)', 'v2=1', 'dirty1', 'dirty2', 'dirty3', 'dirty4']
data_file = "%s/data/STREAM_LOAD/test_hash_decimal_normal.data" % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.1, port=http_port,
column_name_list=column_name_list)
assert ret
self.check(table_name)
insert_table_name = 'decimal_hll_insert'
assert self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type)
sql = 'insert into %s select k1, hll_hash(k1), v2 \
from %s' % (insert_table_name, table_name)
ret = self.client.execute(sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
self.check(insert_table_name)
def check(self, table_name):
"""check"""
sql = 'select hll_union_agg(v1) from %s' % table_name
ret1 = self.client.execute(sql)
sql = 'select count(k1) from %s' % table_name
ret2 = self.client.execute(sql)
LOG.info(L('CHECK RET', ret1=ret1, ret2=ret2))
error = (int(ret2[0][0]) - int(ret1[0][0])) / float(ret2[0][0])
assert error < 0.2
class TestNull(object):
"""test nulll"""
database_name = 'stream_load_null_test_db'
def setup_class(self):
"""setup class"""
client = palo_client.get_client(host, query_port, user=user, password=password)
client.clean(self.database_name)
ret = client.create_database(self.database_name)
assert ret
client.use(self.database_name)
self.client = client
def test_null(self):
"""
{
"title": "test_stream_simple1.test_null",
"describe": "stream load null",
"tag": "function,p1"
}
"""
"""stream load null"""
table_name = 'base_null'
self.client.use(self.database_name)
column_list = DATA.types_kv_column_list
distribution_type = DATA.baseall_distribution_info
ret = self.client.create_table(table_name, column_list,
distribution_info=distribution_type, set_null=True)
assert ret
data_file = '%s/data/NULL/data_5' % file_dir
expect_file = '%s/data/NULL/verify_5' % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.2)
assert ret
sql = 'select * from %s order by k2 nulls last' % table_name
ret = self.client.execute(sql)
assert common.check_by_file(expect_file, sql=sql, client=self.client)
insert_table_name = 'insert_' + table_name
ret = self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type, set_null=True)
assert ret
insert_sql = 'insert into %s select * from %s' % (insert_table_name, table_name)
ret = self.client.execute(insert_sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
assert common.check_by_file(expect_file, sql=sql, client=self.client)
def test_not_null(self):
"""
{
"title": "test_stream_simple1.test_not_null",
"describe": "test stream load not null",
"tag": "function,p1"
}
"""
"""test stream load not null"""
table_name = 'base_not_null'
self.client.use(self.database_name)
column_list = DATA.types_kv_column_list
distribution_type = DATA.baseall_distribution_info
ret = self.client.create_table(table_name, column_list,
distribution_info=distribution_type, set_null=False)
assert ret
data_file = '%s/data/NULL/data_5' % file_dir
expect_file = '%s/data/NULL/verify_5_2' % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.2)
assert ret
sql = 'select * from %s order by k2' % table_name
ret = self.client.execute(sql)
assert common.check_by_file(expect_file, sql=sql, client=self.client)
insert_table_name = 'insert_' + table_name
ret = self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type, set_null=True)
assert ret
insert_sql = 'insert into %s select * from %s' % (insert_table_name, table_name)
ret = self.client.execute(insert_sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
assert common.check_by_file(expect_file, sql=sql, client=self.client)
def test_bloom_filter(self):
"""
{
"title": "test_stream_simple1.test_bloom_filter",
"describe": "test stream load with bloom filter",
"tag": "function,p1"
}
"""
"""test stream load with bloom filter"""
table_name = 'base_bloom_filter'
self.client.use(self.database_name)
column_list = DATA.types_kv_column_list
distribution_type = DATA.baseall_distribution_info
bloom_filter = ['k5', 'k7', 'k9']
ret = self.client.create_table(table_name, column_list, distribution_info=distribution_type,
set_null=True, bloom_filter_column_list=bloom_filter)
assert ret
data_file = '%s/data/NULL/data_5' % file_dir
expect_file = '%s/data/NULL/verify_5' % file_dir
ret = self.client.stream_load(table_name, data_file, max_filter_ratio=0.2)
assert ret
sql = 'select * from %s order by k2 nulls last' % table_name
ret = self.client.execute(sql)
assert common.check_by_file(expect_file, sql=sql, client=self.client)
insert_table_name = 'insert_' + table_name
ret = self.client.create_table(insert_table_name, column_list,
distribution_info=distribution_type, set_null=True)
assert ret
insert_sql = 'insert into %s select * from %s' % (insert_table_name, table_name)
ret = self.client.execute(insert_sql)
LOG.info(L('Stream insert ret', ret=ret))
assert ret == ()
assert common.check_by_file(expect_file, sql=sql, client=self.client)