blob: e9b48fc88e04bc38b9b140c17a1433e8b584e872 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Impala tests for DDL statements
import time
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.util.filesystem_utils import WAREHOUSE, IS_S3
# Checks different statements' effect on last DDL time and last compute stats time.
class TestLastDdlTimeUpdate(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestLastDdlTimeUpdate, cls).add_test_dimensions()
# There is no reason to run these tests using all dimensions.
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format == 'text' and\
v.get_value('table_format').compression_codec == 'none')
if cls.exploration_strategy() == 'core' and not IS_S3:
# Don't run on core. This test is very slow and we are unlikely
# to regress here.
cls.ImpalaTestMatrix.add_constraint(lambda v: False)
# Convenience class to make calls to TestLastDdlTimeUpdate.run_test() shorter by
# storing common arguments as members and substituting table name and HDFS warehouse
# path to the query string.
class TestHelper:
def __init__(self, test_suite, db_name, tbl_name):
self.test_suite = test_suite
self.db_name = db_name
self.tbl_name = tbl_name
self.fq_tbl_name = "%s.%s" % (self.db_name, self.tbl_name)
def expect_no_time_change(self, query):
"""Neither transient_lastDdlTime or impala.lastComputeStatsTime should be
changed by running the query.
The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s"
"""
self.run_test(query, False, False)
def expect_ddl_time_change(self, query):
"""Running the query should increase transient_lastDdlTime but
not impala.lastComputeStatsTime.
The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s"
"""
self.run_test(query, True, False)
def expect_stat_time_change(self, query):
"""Running the query should increase impala.lastComputeStatsTime but
not transient_lastDdlTime.
The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s"
"""
self.run_test(query, False, True)
def run_test(self, query, expect_changed_ddl_time, expect_changed_stats_time):
"""
Runs the query and compares the last ddl/compute stats time before and after
executing the query. If expect_changed_ddl_time or expect_changed_stat_time
is true, then we expect the given table property to be increased, otherwise we
expect it to be unchanged.
The following strings are substituted in the query: "%(TBL)s" and "%(WAREHOUSE)s"
"""
HIVE_LAST_DDL_TIME_PARAM_KEY = "transient_lastDdlTime"
LAST_COMPUTE_STATS_TIME_KEY = "impala.lastComputeStatsTime"
# Get timestamps before executing query.
table = self.test_suite.hive_client.get_table(self.db_name, self.tbl_name)
assert table is not None
beforeDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY]
beforeStatsTime = table.parameters[LAST_COMPUTE_STATS_TIME_KEY]
# HMS uses a seconds granularity on the last ddl time - sleeping 1100 ms should be
# enough to ensure that the new timestamps are strictly greater than the old ones.
time.sleep (1.1)
self.test_suite.execute_query(query %
{'TBL': self.fq_tbl_name, 'WAREHOUSE': WAREHOUSE})
# Get timestamps after executing query.
table = self.test_suite.hive_client.get_table(self.db_name, self.tbl_name)
afterDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY]
afterStatsTime = table.parameters[LAST_COMPUTE_STATS_TIME_KEY]
if expect_changed_ddl_time:
# check that the new ddlTime is strictly greater than the old one.
assert long(afterDdlTime) > long(beforeDdlTime)
else:
assert long(afterDdlTime) == long(beforeDdlTime)
if expect_changed_stats_time:
# check that the new statsTime is strictly greater than the old one.
assert long(afterStatsTime) > long(beforeStatsTime)
else:
assert long(afterStatsTime) == long(beforeStatsTime)
def test_alter(self, vector, unique_database):
TBL_NAME = "alter_test_tbl"
FQ_TBL_NAME = unique_database + "." + TBL_NAME
self.execute_query("create external table %s (i int) "
"partitioned by (j int, s string)" % FQ_TBL_NAME)
# compute statistics to fill table property impala.lastComputeStatsTime
self.execute_query("compute stats %s" % FQ_TBL_NAME)
h = TestLastDdlTimeUpdate.TestHelper(self, unique_database, TBL_NAME)
# add/drop partitions
h.expect_no_time_change("alter table %(TBL)s add partition (j=1, s='2012')")
h.expect_no_time_change("alter table %(TBL)s add if not exists "
"partition (j=1, s='2012')")
h.expect_no_time_change("alter table %(TBL)s drop partition (j=1, s='2012')")
h.expect_no_time_change("alter table %(TBL)s drop if exists "
"partition (j=2, s='2012')")
# change location of table
h.expect_ddl_time_change("alter table %(TBL)s set location '%(WAREHOUSE)s'")
# change format of table
h.expect_ddl_time_change("alter table %(TBL)s set fileformat textfile")
self.run_common_test_cases(h)
# prepare for incremental statistics tests
self.execute_query("drop stats %s" % FQ_TBL_NAME)
self.execute_query("alter table %s add partition (j=1, s='2012')" % FQ_TBL_NAME)
# compute incremental statistics
h.expect_stat_time_change("compute incremental stats %(TBL)s")
h.expect_stat_time_change("compute incremental stats %(TBL)s "
"partition (j=1, s='2012')")
# drop incremental statistics
h.expect_no_time_change("drop incremental stats %(TBL)s partition (j=1, s='2012')")
# prepare for table sample statistics tests
self.execute_query(
"alter table %s set tblproperties ('impala.enable.stats.extrapolation'='true')"
% FQ_TBL_NAME)
# compute sampled statistics
h.expect_stat_time_change("compute stats %(TBL)s tablesample system(20)")
def test_insert(self, vector, unique_database):
TBL_NAME = "insert_test_tbl"
FQ_TBL_NAME = unique_database + "." + TBL_NAME
self.execute_query("create external table %s (i int) "
"partitioned by (j int, s string)" % FQ_TBL_NAME)
# initialize compute stats time
self.execute_query("compute stats %s" % FQ_TBL_NAME)
h = TestLastDdlTimeUpdate.TestHelper(self, unique_database, TBL_NAME)
# static partition insert
h.expect_no_time_change("insert into %(TBL)s partition(j=1, s='2012') select 10")
# dynamic partition insert
h.expect_no_time_change("insert into %(TBL)s partition(j, s) select 10, 2, '2013'")
# dynamic partition insert changing no partitions (empty input)
h.expect_no_time_change("insert into %(TBL)s partition(j, s) "
"select * from (select 10 as i, 2 as j, '2013' as s) as t "
"where t.i < 10")
# dynamic partition insert modifying an existing partition
h.expect_no_time_change("insert into %(TBL)s partition(j, s) select 20, 1, '2012'")
def test_kudu(self, vector, unique_database):
TBL_NAME = "kudu_test_tbl"
FQ_TBL_NAME = unique_database + "." + TBL_NAME
self.execute_query("create table %s (i int primary key) "
"partition by hash(i) partitions 3 stored as kudu" % FQ_TBL_NAME)
# initialize last compute stats time
self.execute_query("compute stats %s" % FQ_TBL_NAME)
h = TestLastDdlTimeUpdate.TestHelper(self, unique_database, TBL_NAME)
# insert
h.expect_no_time_change("insert into %s values (1)" % FQ_TBL_NAME)
self.run_common_test_cases(h)
# Tests that should behave the same with HDFS and Kudu tables.
def run_common_test_cases(self, test_helper):
h = test_helper
# rename columns
h.expect_ddl_time_change("alter table %(TBL)s change column i k int")
h.expect_ddl_time_change("alter table %(TBL)s change column k i int")
# change table property
h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('a'='b')")
# changing table statistics manually
h.expect_ddl_time_change("alter table %(TBL)s set tblproperties ('numRows'='1')")
# changing column statistics manually
h.expect_no_time_change("alter table %(TBL)s set column stats i ('numdvs'='3')")
# compute statistics
h.expect_stat_time_change("compute stats %(TBL)s")
# compute statistics for a single column
h.expect_stat_time_change("compute stats %(TBL)s (i)")
# drop statistics
h.expect_no_time_change("drop stats %(TBL)s")
# invalidate metadata and reload table
self.execute_query("invalidate metadata %s" % h.fq_tbl_name)
# run any query to reload the table
h.expect_no_time_change("describe %(TBL)s")