blob: c6bd8c437008e9a68217082db091642d67ed5b77 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from os import path
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import (
create_exec_option_dimension,
create_single_exec_option_dimension,
create_uncompressed_text_dimension)
class TestStatsExtrapolation(ImpalaTestSuite):
"""Test stats extrapolation and compute stats tablesample. Stats extrapolation is
enabled via table property and not via the impalad startup flag so these tests can be
run as regular tests (non-custom-cluster) and in parallel with other tests."""
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestStatsExtrapolation, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def test_stats_extrapolation(self, vector, unique_database):
vector.get_value('exec_option')['num_nodes'] = 1
vector.get_value('exec_option')['explain_level'] = 2
self.run_test_case('QueryTest/stats-extrapolation', vector, unique_database)
def test_compute_stats_tablesample(self, vector, unique_database):
"""COMPUTE STATS TABLESAMPLE is inherently non-deterministic due to its use of
SAMPLED_NDV() so we test it specially. The goal of this test is to ensure that
COMPUTE STATS TABLESAMPLE computes in-the-right-ballpark stats and successfully
stores them in the HMS."""
# Since our test tables are small, set the minimum sample size to 0 to make sure
# we exercise the sampling code paths.
self.client.execute("set compute_stats_min_sample_size=0")
# Test partitioned table.
part_test_tbl = unique_database + ".alltypes"
self.clone_table("functional.alltypes", part_test_tbl, True, vector)
# Clone to use as a baseline. We run the regular COMPUTE STATS on this table.
part_test_tbl_base = unique_database + ".alltypes_base"
self.clone_table(part_test_tbl, part_test_tbl_base, True, vector)
self.client.execute("compute stats {0}".format(part_test_tbl_base))
# Enable stats extrapolation on both tables to match SHOW output.
self.__set_extrapolation_tblprop(part_test_tbl)
self.__set_extrapolation_tblprop(part_test_tbl_base)
self.__run_sampling_test(part_test_tbl, "", part_test_tbl_base, 1, 3)
self.__run_sampling_test(part_test_tbl, "", part_test_tbl_base, 10, 7)
self.__run_sampling_test(part_test_tbl, "", part_test_tbl_base, 20, 13)
self.__run_sampling_test(part_test_tbl, "", part_test_tbl_base, 100, 99)
# Test unpartitioned table.
nopart_test_tbl = unique_database + ".alltypesnopart"
self.client.execute("create table {0} as select * from functional.alltypes"\
.format(nopart_test_tbl))
# Clone to use as a baseline. We run the regular COMPUTE STATS on this table.
nopart_test_tbl_base = unique_database + ".alltypesnopart_base"
self.clone_table(nopart_test_tbl, nopart_test_tbl_base, False, vector)
self.client.execute("compute stats {0}".format(nopart_test_tbl_base))
# Enable stats extrapolation on both tables to match SHOW output.
self.__set_extrapolation_tblprop(nopart_test_tbl)
self.__set_extrapolation_tblprop(nopart_test_tbl_base)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_base, 1, 3)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_base, 10, 7)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_base, 20, 13)
self.__run_sampling_test(nopart_test_tbl, "", nopart_test_tbl_base, 100, 99)
# Test empty table.
empty_test_tbl = unique_database + ".empty_tbl"
self.clone_table("functional.alltypes", empty_test_tbl, False, vector)
self.__set_extrapolation_tblprop(empty_test_tbl)
self.__run_sampling_test(empty_test_tbl, "", empty_test_tbl, 10, 7)
# Test column subset.
column_subset_tbl = unique_database + ".column_subset"
columns = "(int_col, string_col)"
self.clone_table("functional.alltypes", column_subset_tbl, True, vector)
self.__set_extrapolation_tblprop(column_subset_tbl)
self.__run_sampling_test(column_subset_tbl, columns, part_test_tbl_base, 1, 3)
self.__run_sampling_test(column_subset_tbl, columns, part_test_tbl_base, 10, 7)
self.__run_sampling_test(column_subset_tbl, columns, part_test_tbl_base, 20, 13)
self.__run_sampling_test(column_subset_tbl, columns, part_test_tbl_base, 100, 99)
# Test no columns.
no_column_tbl = unique_database + ".no_columns"
columns = "()"
self.clone_table("functional.alltypes", no_column_tbl, True, vector)
self.__set_extrapolation_tblprop(no_column_tbl)
self.__run_sampling_test(no_column_tbl, columns, part_test_tbl_base, 10, 7)
# Test wide table. Should not crash or error. This takes a few minutes so restrict
# to exhaustive.
if self.exploration_strategy() == "exhaustive":
wide_test_tbl = unique_database + ".wide"
self.clone_table("functional.widetable_1000_cols", wide_test_tbl, False, vector)
self.__set_extrapolation_tblprop(wide_test_tbl)
self.client.execute(
"compute stats {0} tablesample system(10)".format(wide_test_tbl))
def __set_extrapolation_tblprop(self, tbl):
"""Alters the given table to enable stats extrapolation via tblproperty."""
self.client.execute("alter table {0} set "\
"tblproperties('impala.enable.stats.extrapolation'='true')".format(tbl))
def __run_sampling_test(self, tbl, cols, expected_tbl, perc, seed):
"""Drops stats on 'tbl' and then runs COMPUTE STATS TABLESAMPLE on 'tbl' with the
given column restriction clause, sampling percent and random seed. Checks that
the resulting table and column stats are reasonably close to those of
'expected_tbl'."""
self.client.execute("drop stats {0}".format(tbl))
self.client.execute("compute stats {0}{1} tablesample system ({2}) repeatable ({3})"\
.format(tbl, cols, perc, seed))
self.__check_table_stats(tbl, expected_tbl)
self.__check_column_stats(tbl, expected_tbl)
def __check_table_stats(self, tbl, expected_tbl):
"""Checks that the row counts reported in SHOW TABLE STATS on 'tbl' are within 2x
of those reported for 'expected_tbl'. Assumes that COMPUTE STATS was previously run
on 'expected_table' and that COMPUTE STATS TABLESAMPLE was run on 'tbl'."""
actual = self.client.execute("show table stats {0}".format(tbl))
expected = self.client.execute("show table stats {0}".format(expected_tbl))
assert len(actual.data) == len(expected.data)
assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
rows_col_idx = col_names.index("#ROWS")
extrap_rows_col_idx = col_names.index("EXTRAP #ROWS")
for i in xrange(0, len(actual.data)):
act_cols = actual.data[i].split("\t")
exp_cols = expected.data[i].split("\t")
assert int(exp_cols[rows_col_idx]) >= 0
self.appx_equals(\
int(act_cols[extrap_rows_col_idx]), int(exp_cols[rows_col_idx]), 2)
# Only the table-level row count is stored. The partition row counts
# are extrapolated.
if act_cols[0] == "Total":
self.appx_equals(
int(act_cols[rows_col_idx]), int(exp_cols[rows_col_idx]), 2)
elif len(actual.data) > 1:
# Partition row count is expected to not be set.
assert int(act_cols[rows_col_idx]) == -1
def __check_column_stats(self, tbl, expected_tbl):
"""Checks that the NDVs in SHOW COLUMNS STATS on 'tbl' are within 2x of those
reported for 'expected_tbl'. Assumes that COMPUTE STATS was previously run
on 'expected_table' and that COMPUTE STATS TABLESAMPLE was run on 'tbl'."""
actual = self.client.execute("show column stats {0}".format(tbl))
expected = self.client.execute("show column stats {0}".format(expected_tbl))
assert len(actual.data) == len(expected.data)
assert len(actual.schema.fieldSchemas) == len(expected.schema.fieldSchemas)
col_names = [fs.name.upper() for fs in actual.schema.fieldSchemas]
ndv_col_idx = col_names.index("#DISTINCT VALUES")
for i in xrange(0, len(actual.data)):
act_cols = actual.data[i].split("\t")
exp_cols = expected.data[i].split("\t")
assert int(exp_cols[ndv_col_idx]) >= 0
self.appx_equals(int(act_cols[ndv_col_idx]), int(exp_cols[ndv_col_idx]), 2)