blob: b1bcfbb7e7502699e6f633b3b40a44685be02725 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Impala tests for DDL statements
import time
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.util.filesystem_utils import WAREHOUSE, IS_S3
# Checks that ALTER and INSERT statements update the last DDL time of the modified table.
class TestLastDdlTimeUpdate(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestLastDdlTimeUpdate, cls).add_test_dimensions()
# There is no reason to run these tests using all dimensions.
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format == 'text' and\
v.get_value('table_format').compression_codec == 'none')
if cls.exploration_strategy() == 'core' and not IS_S3:
# Don't run on core. This test is very slow and we are unlikely
# to regress here.
cls.ImpalaTestMatrix.add_constraint(lambda v: False)
def test_alter(self, vector, unique_database):
TBL_NAME = "alter_test_tbl"
FQ_TBL_NAME = unique_database + "." + TBL_NAME
self.execute_query("create external table %s (i int) "
"partitioned by (j int, s string)" % FQ_TBL_NAME)
# add/drop partitions
self.run_test("alter table %s add partition (j=1, s='2012')" % FQ_TBL_NAME,
unique_database, TBL_NAME, False)
self.run_test("alter table %s add if not exists "
"partition (j=1, s='2012')" % FQ_TBL_NAME,
unique_database, TBL_NAME, False)
self.run_test("alter table %s drop partition (j=1, s='2012')" % FQ_TBL_NAME,
unique_database, TBL_NAME, False)
self.run_test("alter table %s drop if exists "
"partition (j=2, s='2012')" % FQ_TBL_NAME,
unique_database, TBL_NAME, False)
# rename columns
self.run_test("alter table %s change column i k int" % FQ_TBL_NAME,
unique_database, TBL_NAME, True)
self.run_test("alter table %s change column k i int" % FQ_TBL_NAME,
unique_database, TBL_NAME, True)
# change location of table
self.run_test("alter table %s set location "
"'%s'" % (FQ_TBL_NAME, WAREHOUSE),
unique_database, TBL_NAME, True)
# change format of table
self.run_test("alter table %s set fileformat textfile" % FQ_TBL_NAME,
unique_database, TBL_NAME, True)
def test_insert(self, vector, unique_database):
TBL_NAME = "insert_test_tbl"
FQ_TBL_NAME = unique_database + "." + TBL_NAME
self.execute_query("create external table %s (i int) "
"partitioned by (j int, s string)" % FQ_TBL_NAME)
# static partition insert
self.run_test("insert into %s partition(j=1, s='2012') "
"select 10" % FQ_TBL_NAME, unique_database, TBL_NAME, False)
# dynamic partition insert
self.run_test("insert into %s partition(j, s) "
"select 10, 2, '2013'" % FQ_TBL_NAME, unique_database, TBL_NAME, False)
# dynamic partition insert changing no partitions (empty input)
self.run_test("insert into %s partition(j, s) "
"select * from (select 10 as i, 2 as j, '2013' as s) as t "
"where t.i < 10" % FQ_TBL_NAME, unique_database, TBL_NAME, False)
# dynamic partition insert modifying an existing partition
self.run_test("insert into %s partition(j, s) "
"select 20, 1, '2012'" % FQ_TBL_NAME, unique_database, TBL_NAME, False)
def run_test(self, query, db_name, table_name, expect_changed):
"""
Runs the given query (expected to be an ALTER or INSERT statement)
and compares the last ddl time before and after executing the query.
If expect_change is true then we expect the last ddl time to increase,
otherwise we expect no change.
"""
HIVE_LAST_DDL_TIME_PARAM_KEY = "transient_lastDdlTime"
# Get last DDL time before executing query.
table = self.hive_client.get_table(db_name, table_name)
assert table is not None
beforeDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY]
# Sleep for 2s to make sure the new ddlTime is strictly greater than the old one.
# Hive uses a seconds granularity on the last ddl time.
time.sleep (2)
self.execute_query(query)
# Get last ddl time after executing query .
table = self.hive_client.get_table(db_name, table_name)
afterDdlTime = table.parameters[HIVE_LAST_DDL_TIME_PARAM_KEY]
if expect_changed:
# check that the new ddlTime is strictly greater than the old one.
assert long(afterDdlTime) > long(beforeDdlTime)
else:
assert long(afterDdlTime) == long(beforeDdlTime)