blob: 3e76c15c285c1f68ae698c1694bd11e5fd9f39ba [file] [log] [blame]
#!/bin/env python
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""
test bloom filter
Date: 2015/10/09 17:32:19
"""
import time
import pymysql
from data import bloom_filter as DATA
from lib import palo_client
from lib import palo_config
from lib import util
from lib import common
LOG = palo_client.LOG
L = palo_client.L
client = None
config = palo_config.config
broker_info = palo_config.broker_info
def setup_module():
"""
set up
"""
global client
global mysql_db
mysql_db = 'bloom_filter_a_mysql'
client = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user,
password=config.fe_password, http_port=config.fe_http_port)
init_mysql(config.mysql_host, config.mysql_port, "./data/bloom_filter/init_mysql.sql",\
user=config.mysql_user, password=config.mysql_password)
def init_mysql(host, port, sql_file, user="root", password=""):
"""
初始化数据
"""
connect = pymysql.connect(host=host, port=port, user=user, passwd=password)
cursor = connect.cursor()
cursor.execute('show databases like "%s"' % mysql_db)
ret = cursor.fetchall()
if len(ret) == 1:
return
init_sql = open(sql_file).read()
lines = init_sql.split(";")
for line in lines:
line = line.strip()
if not line:
continue
cursor.execute(line.replace("test", mysql_db))
connect.commit()
def execute(host, port, sql, user="root", password=""):
"""
连接mysql执行语句
"""
connect = pymysql.connect(host=host, port=port, user=user, passwd=password)
cursor = connect.cursor()
try:
LOG.info(L('mysql check sql', sql=sql))
cursor.execute(sql)
return cursor.fetchall()
except Exception as error:
assert False, "execute error. %s" % str(error)
def test_base():
"""
{
"title": "test_sys_bloom_filter_a.test_base",
"describe": "支持的所有类型上使用bloom filter,导入查询正确",
"tag": "system,p1"
}
"""
"""
支持的所有类型上使用bloom filter,导入查询正确
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
ret = client.create_table(table_name, DATA.schema,
bloom_filter_column_list=DATA.bloom_filter_column_list)
assert ret
#校验schema正确
schema = client.get_index_schema(table_name)
for column in schema:
if column[0] in DATA.bloom_filter_column_list:
assert column[5].find("BLOOM_FILTER") >= 0, "%s should be bloom filter" % str(column)
data_desc_list = palo_client.LoadDataInfo(DATA.file_path, table_name)
assert client.batch_load(util.get_label(), data_desc_list, max_filter_ratio=0.05, is_wait=True,
broker=broker_info)
assert client.verify(DATA.expected_data_file_list, table_name)
client.clean(database_name)
def test_tinyint_bloom_filter():
"""
{
"title": "test_sys_bloom_filter_a.test_tinyint_bloom_filter",
"describe": "tinyint类型列不可以使用bloom filter",
"tag": "system,p1,fuzz"
}
"""
"""
tinyint类型列不可以使用bloom filter
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
try:
ret = client.create_table(table_name, DATA.schema, \
bloom_filter_column_list=DATA.tinyint_key_list)
except Exception as error:
print(error)
else:
assert False
try:
ret = client.create_table(table_name, DATA.schema, \
bloom_filter_column_list=DATA.tinyint_value_list)
except Exception as error:
print(error)
else:
assert False
try:
ret = client.create_table(table_name, DATA.schema_tinyint, \
bloom_filter_column_list=DATA.tinyint_value_replace_list)
except Exception as error:
print(error)
else:
assert False
client.clean(database_name)
def test_illegal_bloom_filter():
"""
{
"title": "test_sys_bloom_filter_a.test_illegal_bloom_filter",
"describe": "非REPLACE聚合方式的value列不可以使用bloom filter",
"tag": "system,p1,fuzz"
}
"""
"""
非REPLACE聚合方式的value列不可以使用bloom filter
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
for bf_list in DATA.illegal_bloom_filter_column_list:
try:
ret = client.create_table(table_name, DATA.schema, \
bloom_filter_column_list=bf_list)
except Exception as error:
print(error)
else:
assert False
client.clean(database_name)
def test_illegal_bloom_filter_1():
"""
{
"title": "test_sys_bloom_filter_a.test_illegal_bloom_filter",
"describe": "聚合表的value列不可以使用bloom filter",
"tag": "system,p1,fuzz"
}
"""
"""
聚合表的value列不支持创建bloom filter
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
for bf_list in DATA.value_list:
try:
ret = client.create_table(table_name, DATA.schema, \
bloom_filter_column_list=bf_list)
except Exception as error:
print(error)
else:
assert False
client.clean(database_name)
def test_float_bloom_filter():
"""
{
"title": "test_float_bloom_filter",
"describe": "FLOAT DOUBLE类型的列不可以使用bloom filter",
"tag": "system,p1,fuzz"
}
"""
"""
FLOAT DOUBLE类型的列不可以使用bloom filter
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
try:
ret = client.create_table(table_name, DATA.schema_float, \
bloom_filter_column_list=DATA.float_value_replace_list)
except Exception as error:
print(error)
else:
assert False
try:
ret = client.create_table(table_name, DATA.schema_float, \
bloom_filter_column_list=DATA.double_value_replace_list)
except Exception as error:
print(error)
else:
assert False
client.clean(database_name)
def test_default_value():
"""
{
"title": "test_sys_bloom_filter_a.test_default_value",
"describe": "默认值使用bloom filter列",
"tag": "system,p1"
}
"""
"""
默认值使用bloom filter列
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
ret = client.create_table(table_name, DATA.schema_default_value, \
bloom_filter_column_list=DATA.default_value_list)
assert ret
data_desc_list = palo_client.LoadDataInfo(DATA.default_value_file_path, table_name, \
column_name_list=["increment_key"])
ret = client.batch_load(util.get_label(), data_desc_list, max_filter_ratio=0.05, is_wait=True,
broker=broker_info)
assert ret
assert client.verify(DATA.expected_default_value_file, table_name)
client.clean(database_name)
def test_partition_null_replica():
"""
{
"title": "test_sys_bloom_filter_a.test_partition_null_replica",
"describe": "多分区建表,数据导入时部分副本数据为空",
"tag": "system,p1"
}
"""
"""
多分区建表,数据导入时部分副本数据为空
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
ret = client.create_table(table_name, DATA.schema, \
partition_info=DATA.partition_info, \
bloom_filter_column_list=DATA.bloom_filter_column_list)
assert ret
data_desc_list = palo_client.LoadDataInfo(DATA.file_path, table_name)
assert client.batch_load(util.get_label(), data_desc_list, max_filter_ratio=0.05, is_wait=True,
broker=broker_info)
assert client.verify(DATA.expected_data_file_list, table_name)
sql = "SELECT tinyint_key from %s.%s where tinyint_key<0" % (database_name, table_name)
result = client.execute(sql)
for data in result:
assert data[0] == -1 or data[0] == -2 or data[0] == -128
client.clean(database_name)
def test_be_ce():
"""
{
"title": "test_sys_bloom_filter_a.test_be_ce",
"describe": "多次导入BE、CE结束后数据正确,可以skip",
"tag": "system,p1"
}
"""
"""
多次导入BE、CE结束后数据正确
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
ret = client.create_table(table_name, DATA.schema, \
partition_info=DATA.partition_info, \
distribution_info=DATA.distribution_info, \
bloom_filter_column_list=DATA.bloom_filter_column_list)
assert ret
ix = 0
for local_file in DATA.be_ce_local_file_list:
ix = ix + 1
ret = client.stream_load(table_name, local_file, max_filter_ratio=0.5)
assert ret
#TODO check finish BE/CE
assert client.verify(DATA.expected_data_file_list, table_name)
client.clean(database_name)
def test_rollup_with_bloom_filter():
"""
{
"title": "test_sys_bloom_filter_a.test_rollup_with_bloom_filter",
"describe": "rollup表使用bloom filter",
"tag": "system,p1"
}
"""
"""
rollup表使用bloom filter
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
ret = client.create_table(table_name, DATA.schema, \
partition_info=DATA.partition_info, \
distribution_info=DATA.distribution_info, \
bloom_filter_column_list=DATA.bloom_filter_column_list)
assert ret
#load before rollup
length = len(DATA.be_ce_local_file_list)
ix = 0
for local_file in DATA.be_ce_local_file_list[:int(length / 2)]:
ix = ix + 1
ret = client.stream_load(table_name, local_file, max_filter_ratio=0.5)
assert ret
sql = "SELECT int_key, character_key, datetime_key FROM %s.%s \
group by int_key, character_key, datetime_key" \
% (database_name, table_name)
expected = client.execute(sql)
#rollup
ret = client.create_rollup_table(table_name, index_name, \
DATA.rollup_with_bloom_filter, is_wait=True)
assert ret
actual = client.execute(sql)
util.check(expected, actual, force_order=True)
for local_file in DATA.be_ce_local_file_list[int(length / 2):]:
ret = client.stream_load(table_name, local_file, max_filter_ratio=0.5)
assert ret
assert client.verify(DATA.expected_data_file_list, table_name)
#check rollup data
mysql_result = execute(config.mysql_host, config.mysql_port, \
"SELECT int_key, character_key, datetime_key \
from %s.all_type group by int_key, character_key, datetime_key" % mysql_db, \
user=config.mysql_user, password=config.mysql_password)
palo_result = client.execute(sql)
util.check(palo_result, mysql_result, force_order=True)
client.clean(database_name)
def test_rollup_without_bf():
"""
{
"title": "test_sys_bloom_filter_a.test_rollup_without_bf",
"describe": "rollup中不包含bloom filter",
"tag": "system,p1"
}
"""
"""
rollup中不包含bloom filter
"""
database_name, table_name, index_name = util.gen_name_list()
client.clean(database_name)
ret = client.create_database(database_name)
assert ret
client.use(database_name)
ret = client.create_table(table_name, DATA.schema, \
partition_info=DATA.partition_info, \
distribution_info=DATA.distribution_info, \
bloom_filter_column_list=DATA.bloom_filter_column_list)
assert ret
#load before rollup
length = len(DATA.be_ce_local_file_list)
for local_file in DATA.be_ce_local_file_list[:int(length / 2)]:
ret = client.stream_load(table_name, local_file, max_filter_ratio=0.5)
assert ret
sql = "SELECT tinyint_key, SUM(float_value) FROM %s.%s group by tinyint_key" \
% (database_name, table_name)
expected = client.execute(sql)
#rollup
ret = client.create_rollup_table(table_name, index_name,
DATA.rollup_without_bloom_filter, is_wait=True)
assert ret
timeout = 400
while timeout > 0:
time.sleep(1)
timeout -= 1
hit_index = common.get_explain_rollup(client, sql)
if index_name in hit_index:
break
assert index_name in hit_index, "hit index: %s; expected index: %s" % (hit_index, index_name)
actual = client.execute(sql)
util.check(actual, expected, force_order=True)
for local_file in DATA.be_ce_local_file_list[int(length / 2):]:
ret = client.stream_load(table_name, local_file, max_filter_ratio=0.5)
assert ret
assert client.verify(DATA.expected_data_file_list, table_name)
#check rollup data
mysql_result = execute(config.mysql_host, config.mysql_port, \
"SELECT tinyint_key, SUM(float_value) \
from %s.all_type group by tinyint_key" % mysql_db, \
user=config.mysql_user, password=config.mysql_password)
palo_result = client.execute(sql)
util.check(palo_result, mysql_result, force_order=True)
client.clean(database_name)
def teardown_module():
"""
tear down
"""
pass