blob: f30d5cb825b2f2167f443b7997a1d5f299334364 [file] [log] [blame]
#!/bin/env pyth
# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
############################################################################
#
# @file test_sys_load.py
# @date 2017-04-10 15:02:22
# @brief This file is a test file for palo small load in complex scenarios.
#
#############################################################################
"""
测试hll data type
"""
import numpy
import sys
import time
import pytest
from data import hll_load as DATA
sys.path.append("../")
from lib import palo_config
from lib import palo_client
from lib import util
import palo_logger
import palo_exception
client = None
config = palo_config.config
local_data_file = '../hdfs/data/qe/xaaa'
local_data_file1 = '../hdfs/data/qe/baseall.txt'
hdfs_test = palo_config.gen_remote_file_path("/qe/xaaa")
hdfs_baseall = palo_config.gen_remote_file_path("/qe/baseall.txt")
hdfs_x = palo_config.gen_remote_file_path("/qe/x0")
broker_info = palo_config.broker_info
client = None
HLL_ERROR = 100000
#日志 异常 对象
LOG = palo_logger.Logger.getLogger()
L = palo_logger.StructedLogMessage
PaloClientException = palo_exception.PaloException
CLUSTER = 'default_cluster'
test_load_db = 'test_load_db'
compare = 'test_query_qa.test'
compare_distinct_count = ((255, 39388, 60008, 60010, 9484, 19815, 19767, 60013, 59956, 30579, 9),)
compare_distinct_count_1 = ((255, 39388, 60008, 60010, 9484),)
compare_distinct_count_2 = ((19815, 19767, 60013, 59956, 30579, 9),)
hll_union_sql_1 = 'SELECT hll_union_agg(k1_hll), hll_union_agg(k2_hll), hll_union_agg(k3_hll), \
hll_union_agg(k4_hll), hll_union_agg(k5_hll) FROM %s'
hll_union_sql_2 = 'SELECT hll_union_agg(k6_hll), hll_union_agg(k7_hll), hll_union_agg(k8_hll), \
hll_union_agg(k9_hll), hll_union_agg(k10_hll), hll_union_agg(k11_hll) FROM %s'
ndv_sql_1 = 'SELECT ndv(k1), ndv(k2), ndv(k3), ndv(k4), ndv(k5) FROM %s'
ndv_sql_2 = 'SELECT ndv(k6), ndv(k7), ndv(k8), ndv(k9), ndv(k10), ndv(k11) FROM %s'
hll_cardinality_1 = 'SELECT hll_cardinality(k1_hll), \
hll_cardinality(k2_hll), hll_cardinality(k3_hll), hll_cardinality(k4_hll), \
hll_cardinality(k5_hll) FROM %s ORDER BY %s'
hll_cardinality_2 = 'SELECT hll_cardinality(k6_hll), hll_cardinality(k7_hll), \
hll_cardinality(k8_hll), hll_cardinality(k9_hll), hll_cardinality(k10_hll), \
hll_cardinality(k11_hll) FROM %s ORDER BY %s'
ndv_cardinality_1 = 'SELECT ndv(k1), ndv(k2), ndv(k3), ndv(k4), ndv(k5) FROM %s GROUP BY %s \
ORDER BY %s '
ndv_cardinality_2 = 'SELECT ndv(k6), ndv(k7), ndv(k8), ndv(k9), ndv(k10), ndv(k11) FROM %s \
GROUP BY %s ORDER BY %s'
distinct_cardinality_1 = 'SELECT count(distinct k1), count(distinct k2), count(distinct k3), \
count(distinct k4),count(distinct k5) FROM %s GROUP BY %s ORDER BY %s'
distinct_cardinality_2 = 'SELECT count(distinct k6), \
count(distinct k7), count(distinct k8), count(distinct k9), \
count(distinct k10), count(distinct k11) FROM %s GROUP BY %s ORDER BY %s'
def setup_module():
"""
setUp
"""
global client
client = palo_client.get_client(config.fe_host, config.fe_query_port, user=config.fe_user,
password=config.fe_password, http_port=config.fe_http_port)
def wait_end(database_name):
"""
wait to finished
"""
ret = True
print('waitint for load...')
state = None
while ret:
job_list = client.get_load_job_list(database_name=database_name)
state = job_list[-1][2]
if state == "FINISHED" or state == "CANCELLED":
print(state)
ret = False
time.sleep(1)
assert state == "FINISHED"
def wait_all_end(database_name):
"""
wait to finished
"""
ret = True
flag = 0
while ret:
job_list = client.get_load_job_list(database_name=database_name, cluster_name=CLUSTER)
for job in job_list:
state = job[2]
label = job[1]
while state != "FINISHED" and state != "CANCELLED":
time.sleep(1)
state = client.get_load_job_state(label, database_name, cluster_name=CLUSTER)
if state == "CANCELLED":
flag += 1
ret = False
assert flag == 0
def execute(line):
"""execute sql"""
print(line)
palo_result = client.execute(line)
return palo_result
def bulk_load(client, table_family_name, load_label, data_file, max_filter_ratio=0,
column_name_list=None, timeout=100, database_name=None, backend_id=None,
is_wait=False, be_user="root", be_password="", hll_column=None):
"""
小批量导入
"""
pass
def init(db_name, table_name, create_sql, pull_load=True):
"""
create db, table, bulk load, batch load
Args:
db_name:
table_name:
create_sql:
key_column:
Returns:
"""
# create db & table1 & table2
client.execute('drop database if exists %s' % db_name)
client.create_database(db_name)
client.use(db_name)
table_list = list()
if pull_load:
table = pull_load_init(table_name, db_name, create_sql)
table_list.append(table)
wait_all_end(db_name)
return table_list
def pull_load_init(table_name, db_name, create_sql):
"""create table and init data by pull load"""
table3 = table_name + '_pull'
sql = create_sql % table3
ret = client.execute(sql)
assert ret == ()
broker_list = client.get_broker_list()
broker = broker_list[0][0]
set_list = list(DATA.columns_func)
data_load_info = palo_client.LoadDataInfo(hdfs_baseall, table3,
column_name_list=DATA.data_columns, set_list=set_list)
ret = client.batch_load(util.get_label(), data_load_info, database_name=db_name, broker=broker_info)
set_list = list(DATA.columns_func)
data_load_info = palo_client.LoadDataInfo(hdfs_test, table3, column_name_list=DATA.data_columns,
set_list=set_list)
ret = client.batch_load(util.get_label(), data_load_info, database_name=db_name, broker=broker_info)
return table3
def test_load_k1():
"""
{
"title": "test_sys_hll_load.test_load_k1",
"describe": "k1 is the only key, load and aggregate",
"tag": "function,p1"
}
"""
"""
k1 is the only key, load and aggregate
"""
db_name, table_name, invalied_name_1 = util.gen_name_list()
table_list = init(db_name, table_name, DATA.k1_hll)
p_key = 'k1'
# test hll_cardinality
distinct_cardinality_r_1 = client.execute(distinct_cardinality_1 % (compare, p_key, p_key))
distinct_cardinality_r_2 = client.execute(distinct_cardinality_2 % (compare, p_key, p_key))
for table in table_list:
hll_cardinality_r = client.execute(hll_cardinality_1 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_1 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_1)
check_mat(ndv_cardinality_r, distinct_cardinality_r_1)
hll_cardinality_r = client.execute(hll_cardinality_2 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_2 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_2)
check_mat(ndv_cardinality_r, distinct_cardinality_r_2)
# test hll_union_agg
r = tuple(compare_distinct_count)
r_r_1 = tuple(compare_distinct_count_1)
r_r_2 = tuple(compare_distinct_count_2)
r_n_1 = execute(ndv_sql_1 % compare)
r_n_2 = execute(ndv_sql_2 % compare)
check_union_agg(r_n_1[0], r_r_1[0])
check_union_agg(r_n_2[0], r_r_2[0])
for table in table_list:
print('hll union check, table: ', table)
r1 = execute(hll_union_sql_1 % table)
r2 = execute(hll_union_sql_2 % table)
check_union_agg(r1[0], r_r_1[0])
check_union_agg(r2[0], r_r_2[0])
client.clean(db_name)
def test_load_k2():
"""
{
"title": "test_sys_hll_load.test_load_k2",
"describe": "k2 is the only key, load and aggregate",
"tag": "function,p1"
}
"""
"""
k2 is the only key, load and aggregate
"""
db_name, table_name, invalied_name_1 = util.gen_name_list()
table_list = init(db_name, table_name, DATA.k2_hll)
p_key = 'k2'
# test hll_cardinality
"""
distinct_cardinality_r_1 = client.execute(distinct_cardinality_1 % (compare, p_key, p_key))
distinct_cardinality_r_2 = client.execute(distinct_cardinality_2 % (compare, p_key, p_key))
for table in table_list:
hll_cardinality_r = client.execute(hll_cardinality_1 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_1 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_1)
check_mat(ndv_cardinality_r, distinct_cardinality_r_1)
hll_cardinality_r = client.execute(hll_cardinality_2 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_2 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_2)
check_mat(ndv_cardinality_r, distinct_cardinality_r_2)
"""
# test hll_union_agg
r = tuple(compare_distinct_count)
r_r_1 = tuple(compare_distinct_count_1)
r_r_2 = tuple(compare_distinct_count_2)
r_n_1 = execute(ndv_sql_1 % compare)
r_n_2 = execute(ndv_sql_2 % compare)
check_union_agg(r_n_1[0], r_r_1[0])
check_union_agg(r_n_2[0], r_r_2[0])
for table in table_list:
print('hll union check, table: ', table)
r1 = execute(hll_union_sql_1 % table)
r2 = execute(hll_union_sql_2 % table)
check_union_agg(r1[0], r_r_1[0])
check_union_agg(r2[0], r_r_2[0])
client.clean(db_name)
def test_load_k3():
"""
{
"title": "test_sys_hll_load.test_load_k3",
"describe": "k3 is the only key, load and aggregate",
"tag": "function,p1"
}
"""
"""
k3 is the only key, load and aggregate
"""
db_name, table_name, invalied_name_1 = util.gen_name_list()
table_list = init(db_name, table_name, DATA.k3_hll)
p_key = 'k3'
"""
# test hll_cardinality
distinct_cardinality_r_1 = client.execute(distinct_cardinality_1 % (compare, p_key, p_key))
distinct_cardinality_r_2 = client.execute(distinct_cardinality_2 % (compare, p_key, p_key))
for table in table_list:
hll_cardinality_r = client.execute(hll_cardinality_1 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_1 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_1)
check_mat(ndv_cardinality_r, distinct_cardinality_r_1)
hll_cardinality_r = client.execute(hll_cardinality_2 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_2 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_2)
check_mat(ndv_cardinality_r, distinct_cardinality_r_2)
"""
# test hll_union_agg
r = tuple(compare_distinct_count)
r_r_1 = tuple(compare_distinct_count_1)
r_r_2 = tuple(compare_distinct_count_2)
r_n_1 = execute(ndv_sql_1 % compare)
r_n_2 = execute(ndv_sql_2 % compare)
check_union_agg(r_n_1[0], r_r_1[0])
check_union_agg(r_n_2[0], r_r_2[0])
for table in table_list:
print('hll union check, table: ', table)
r1 = execute(hll_union_sql_1 % table)
r2 = execute(hll_union_sql_2 % table)
check_union_agg(r1[0], r_r_1[0])
check_union_agg(r2[0], r_r_2[0])
client.clean(db_name)
def test_load_k4():
"""
{
"title": "test_sys_hll_load.test_load_k4",
"describe": "k4 is the only key, test load and aggregate",
"tag": "function,p1"
}
"""
"""
k4 is the only key, test load and aggregate
"""
db_name, table_name, invalied_name_1 = util.gen_name_list()
table_list = init(db_name, table_name, DATA.k4_hll)
p_key = 'k4'
"""
# test hll_cardinality
distinct_cardinality_r_1 = client.execute(distinct_cardinality_1 % (compare, p_key, p_key))
distinct_cardinality_r_2 = client.execute(distinct_cardinality_2 % (compare, p_key, p_key))
for table in table_list:
hll_cardinality_r = client.execute(hll_cardinality_1 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_1 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_1)
check_mat(ndv_cardinality_r, distinct_cardinality_r_1)
hll_cardinality_r = client.execute(hll_cardinality_2 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_2 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_2)
check_mat(ndv_cardinality_r, distinct_cardinality_r_2)
"""
# test hll_union_agg
r = tuple(compare_distinct_count)
r_r_1 = tuple(compare_distinct_count_1)
r_r_2 = tuple(compare_distinct_count_2)
r_n_1 = execute(ndv_sql_1 % compare)
r_n_2 = execute(ndv_sql_2 % compare)
check_union_agg(r_n_1[0], r_r_1[0])
check_union_agg(r_n_2[0], r_r_2[0])
for table in table_list:
print('hll union check, table: ', table)
r1 = execute(hll_union_sql_1 % table)
r2 = execute(hll_union_sql_2 % table)
check_union_agg(r1[0], r_r_1[0])
check_union_agg(r2[0], r_r_2[0])
client.clean(db_name)
def test_load_k5():
"""
{
"title": "test_sys_hll_load.test_load_k5",
"describe": "k5 is the only key, test load and aggregate",
"tag": "function,p1"
}
"""
"""
k5 is the only key, test load and aggregate
"""
db_name, table_name, invalied_name_1 = util.gen_name_list()
table_list = init(db_name, table_name, DATA.k5_hll)
p_key = 'k5'
"""
# test hll_cardinality
distinct_cardinality_r_1 = client.execute(distinct_cardinality_1 % (compare, p_key, p_key))
distinct_cardinality_r_2 = client.execute(distinct_cardinality_2 % (compare, p_key, p_key))
for table in table_list:
hll_cardinality_r = client.execute(hll_cardinality_1 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_1 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_1)
check_mat(ndv_cardinality_r, distinct_cardinality_r_1)
hll_cardinality_r = client.execute(hll_cardinality_2 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_2 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_2)
check_mat(ndv_cardinality_r, distinct_cardinality_r_2)
"""
# test hll_union_agg
r = tuple(compare_distinct_count)
r_r_1 = tuple(compare_distinct_count_1)
r_r_2 = tuple(compare_distinct_count_2)
r_n_1 = execute(ndv_sql_1 % compare)
r_n_2 = execute(ndv_sql_2 % compare)
check_union_agg(r_n_1[0], r_r_1[0])
check_union_agg(r_n_2[0], r_r_2[0])
for table in table_list:
print('hll union check, table: ', table)
r1 = execute(hll_union_sql_1 % table)
r2 = execute(hll_union_sql_2 % table)
check_union_agg(r1[0], r_r_1[0])
check_union_agg(r2[0], r_r_2[0])
client.clean(db_name)
def test_load_k6():
"""
{
"title": "test_sys_hll_load.test_load_k6",
"describe": "k6 is the only key, test load and aggregate",
"tag": "function,p1"
}
"""
"""
k6 is the only key, test load and aggregate
"""
db_name, table_name, invalied_name_1 = util.gen_name_list()
table_list = init(db_name, table_name, DATA.k6_hll)
p_key = 'k6'
"""
# test hll_cardinality
distinct_cardinality_r_1 = client.execute(distinct_cardinality_1 % (compare, p_key, p_key))
distinct_cardinality_r_2 = client.execute(distinct_cardinality_2 % (compare, p_key, p_key))
for table in table_list:
hll_cardinality_r = client.execute(hll_cardinality_1 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_1 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_1)
check_mat(ndv_cardinality_r, distinct_cardinality_r_1)
hll_cardinality_r = client.execute(hll_cardinality_2 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_2 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_2)
check_mat(ndv_cardinality_r, distinct_cardinality_r_2)
"""
# test hll_union_agg
r = tuple(compare_distinct_count)
r_r_1 = tuple(compare_distinct_count_1)
r_r_2 = tuple(compare_distinct_count_2)
r_n_1 = execute(ndv_sql_1 % compare)
r_n_2 = execute(ndv_sql_2 % compare)
check_union_agg(r_n_1[0], r_r_1[0])
check_union_agg(r_n_2[0], r_r_2[0])
for table in table_list:
print('hll union check, table: ', table)
r1 = execute(hll_union_sql_1 % table)
r2 = execute(hll_union_sql_2 % table)
check_union_agg(r1[0], r_r_1[0])
check_union_agg(r2[0], r_r_2[0])
client.clean(db_name)
def test_load_k7():
"""
{
"title": "test_sys_hll_load.test_load_k7",
"describe": "k7 is the only key, test load and aggregate",
"tag": "function,p1"
}
"""
"""
k7 is the only key, test load and aggregate
"""
db_name, table_name, invalied_name_1 = util.gen_name_list()
table_list = init(db_name, table_name, DATA.k7_hll)
p_key = 'k7'
"""
# test hll_cardinality
distinct_cardinality_r_1 = client.execute(distinct_cardinality_1 % (compare, p_key, p_key))
distinct_cardinality_r_2 = client.execute(distinct_cardinality_2 % (compare, p_key, p_key))
for table in table_list:
hll_cardinality_r = client.execute(hll_cardinality_1 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_1 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_1)
check_mat(ndv_cardinality_r, distinct_cardinality_r_1)
hll_cardinality_r = client.execute(hll_cardinality_2 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_2 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_2)
check_mat(ndv_cardinality_r, distinct_cardinality_r_2)
"""
# test hll_union_agg
r = tuple(compare_distinct_count)
r_r_1 = tuple(compare_distinct_count_1)
r_r_2 = tuple(compare_distinct_count_2)
r_n_1 = execute(ndv_sql_1 % compare)
r_n_2 = execute(ndv_sql_2 % compare)
check_union_agg(r_n_1[0], r_r_1[0])
check_union_agg(r_n_2[0], r_r_2[0])
for table in table_list:
print('hll union check, table: ', table)
r1 = execute(hll_union_sql_1 % table)
r2 = execute(hll_union_sql_2 % table)
check_union_agg(r1[0], r_r_1[0])
check_union_agg(r2[0], r_r_2[0])
client.clean(db_name)
def test_load_k10():
"""
{
"title": "test_sys_hll_load.test_load_k10",
"describe": "k10 is the only key, test load and aggregate",
"tag": "function,p1"
}
"""
"""
k10 is the only key, test load and aggregate
"""
db_name, table_name, invalied_name_1 = util.gen_name_list()
table_list = init(db_name, table_name, DATA.k10_hll)
p_key = 'k10'
"""
# test hll_cardinality
distinct_cardinality_r_1 = client.execute(distinct_cardinality_1 % (compare, p_key, p_key))
distinct_cardinality_r_2 = client.execute(distinct_cardinality_2 % (compare, p_key, p_key))
for table in table_list:
hll_cardinality_r = client.execute(hll_cardinality_1 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_1 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_1)
check_mat(ndv_cardinality_r, distinct_cardinality_r_1)
hll_cardinality_r = client.execute(hll_cardinality_2 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_2 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_2)
check_mat(ndv_cardinality_r, distinct_cardinality_r_2)
"""
# test hll_union_agg
r = tuple(compare_distinct_count)
r_r_1 = tuple(compare_distinct_count_1)
r_r_2 = tuple(compare_distinct_count_2)
r_n_1 = execute(ndv_sql_1 % compare)
r_n_2 = execute(ndv_sql_2 % compare)
check_union_agg(r_n_1[0], r_r_1[0])
check_union_agg(r_n_2[0], r_r_2[0])
for table in table_list:
print('hll union check, table: ', table)
r1 = execute(hll_union_sql_1 % table)
r2 = execute(hll_union_sql_2 % table)
check_union_agg(r1[0], r_r_1[0])
check_union_agg(r2[0], r_r_2[0])
client.clean(db_name)
def test_load_k11():
"""
{
"title": "test_sys_hll_load.test_load_k11",
"describe": "k11 is the only key, test load and aggregate",
"tag": "function,p1"
}
"""
"""
k11 is the only key, test load and aggregate
"""
db_name, table_name, invalied_name_1 = util.gen_name_list()
table_list = init(db_name, table_name, DATA.k11_hll)
p_key = 'k11'
# test hll_cardinality
distinct_cardinality_r_1 = client.execute(distinct_cardinality_1 % (compare, p_key, p_key))
distinct_cardinality_r_2 = client.execute(distinct_cardinality_2 % (compare, p_key, p_key))
for table in table_list:
hll_cardinality_r = client.execute(hll_cardinality_1 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_1 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_1)
check_mat(ndv_cardinality_r, distinct_cardinality_r_1)
hll_cardinality_r = client.execute(hll_cardinality_2 % (table, p_key))
ndv_cardinality_r = client.execute(ndv_cardinality_2 % (table, p_key, p_key))
check_mat(hll_cardinality_r, distinct_cardinality_r_2)
check_mat(ndv_cardinality_r, distinct_cardinality_r_2)
# test hll_union_agg
r = tuple(compare_distinct_count)
r_r_1 = tuple(compare_distinct_count_1)
r_r_2 = tuple(compare_distinct_count_2)
r_n_1 = execute(ndv_sql_1 % compare)
r_n_2 = execute(ndv_sql_2 % compare)
check_union_agg(r_n_1[0], r_r_1[0])
check_union_agg(r_n_2[0], r_r_2[0])
for table in table_list:
print('hll union check, table: ', table)
r1 = execute(hll_union_sql_1 % table)
r2 = execute(hll_union_sql_2 % table)
check_union_agg(r1[0], r_r_1[0])
check_union_agg(r2[0], r_r_2[0])
client.clean(db_name)
def check_mat(hll_result, correct_result):
"""check mat """
print('the length is : ', len(hll_result))
if len(hll_result) != len(correct_result):
print('hll table length is %s' % len(hll_result))
print('actual length is %s' % len(correct_result))
assert 0 == 1, 'result len is diff'
hll_mat = numpy.mat(hll_result, dtype='int')
correct_mat = numpy.mat(correct_result, dtype='int')
minus_mat = abs(hll_mat - correct_mat)
tmp = numpy.mat(correct_result, dtype='float64')
deviation = minus_mat / tmp
print('每列的最大误差:\n %s' % numpy.max(deviation, axis=0))
print('每列的最小误差:\n %s' % numpy.min(deviation, axis=0))
print('每列的平均误差:\n %s' % numpy.mean(deviation, axis=0))
def check_union_agg(hll_result, correct_result):
"""check union agg"""
if len(hll_result) == len(correct_result):
for i in range(0, len(hll_result)):
deviation = compute_deviation(int(hll_result[i]), int(correct_result[i]))
print('%s deviation: %s' % (DATA.columns[i * 2], deviation))
assert deviation < HLL_ERROR
def compute_deviation(actual, computed):
"""
Args:
actual: 实际值
computed: 计算值
Returns: 误差
"""
tmp = abs(actual - computed)
return tmp / float(actual)
if __name__ == '__main__':
setup_module()
test_load_k2()