blob: 58d142b4844e88a5c68cbdf0fd24eabbcc776253 [file] [log] [blame]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from subprocess import check_call
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.common.test_dimensions import create_uncompressed_text_dimension
from tests.common.skip import SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, SkipIfLocal
from tests.util.filesystem_utils import get_fs_path
@SkipIfS3.hive
@SkipIfABFS.hive
@SkipIfADLS.hive
@SkipIfIsilon.hive
@SkipIfLocal.hive
class TestRefreshPartition(ImpalaTestSuite):
"""
This class tests the functionality to refresh a partition individually
for a table in HDFS
"""
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestRefreshPartition, cls).add_test_dimensions()
# There is no reason to run these tests using all dimensions.
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
cls.ImpalaTestMatrix.add_dimension(
create_uncompressed_text_dimension(cls.get_workload()))
def test_refresh_partition_num_rows(self, vector, unique_database):
"""Refreshing a partition should not change it's numRows stat."""
# Create a partitioned table and add data to it.
tbl = unique_database + ".t1"
self.client.execute("create table %s(a int) partitioned by (b int)" % tbl)
self.client.execute("insert into %s partition(b=1) values (1)" % tbl)
# Compute stats on tbl. It should populate the partition num rows.
self.client.execute("compute stats %s" % tbl)
result = self.client.execute("show partitions %s" % tbl)
# Format: partition/#Rows/#Files (first 3 entries)
assert result.get_data().startswith("1\t1\t1"),\
"Incorrect partition stats %s" % result.get_data()
# Add another file to the same partition using hive.
self.run_stmt_in_hive("insert into table %s partition (b=1) values (2)" % tbl)
# Make sure Impala still sees a single row.
assert "1" == self.client.execute("select count(*) from %s" % tbl).get_data()
# refresh the partition and make sure the new row is visible
self.client.execute("refresh %s partition (b=1)" % tbl)
assert "2" == self.client.execute("select count(*) from %s" % tbl).get_data()
# Make sure the partition num rows are unchanged and still 1 but the #files is updated.
result = self.client.execute("show partitions %s" % tbl)
assert result.get_data().startswith("1\t1\t2"),\
"Incorrect partition stats %s" % result.get_data()
# Do a full table refresh and it should still remain the same.
self.client.execute("refresh %s" % tbl)
result = self.client.execute("show partitions %s" % tbl)
assert result.get_data().startswith("1\t1\t2"),\
"Incorrect partition stats %s" % result.get_data()
def test_add_hive_partition_and_refresh(self, vector, unique_database):
"""
Partition added in Hive can be viewed in Impala after refreshing
partition.
"""
table_name = unique_database + '.' + "partition_test_table"
self.client.execute(
'create table %s (x int) partitioned by (y int, z int)' %
table_name)
assert [] == self.get_impala_partition_info(table_name, 'y', 'z')
self.run_stmt_in_hive(
'alter table %s add partition (y=333, z=5309)' % table_name)
# Make sure Impala can't see the partition yet
assert [] == self.get_impala_partition_info(table_name, 'y', 'z')
self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
# Impala can see the partition
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
# Impala's refresh didn't alter Hive's knowledge of the partition
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
def test_drop_hive_partition_and_refresh(self, vector, unique_database):
"""
Partition dropped in Hive is removed in Impala as well after refreshing
partition.
"""
table_name = unique_database + '.' + "partition_test_table"
self.client.execute(
'create table %s (x int) partitioned by (y int, z int)' %
table_name)
self.client.execute(
'alter table %s add partition (y=333, z=5309)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
self.run_stmt_in_hive(
'alter table %s drop partition (y=333, z=5309)' % table_name)
# Make sure Impala can still see the partition
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
# Impala can see the partition is not there anymore
assert [] == self.get_impala_partition_info(table_name, 'y', 'z')
# Impala's refresh didn't alter Hive's knowledge of the partition
assert [] == self.hive_partition_names(table_name)
def test_add_data_and_refresh(self, vector, unique_database, cluster_properties):
"""
Data added through hive is visible in impala after refresh of partition.
"""
table_name = unique_database + '.' + "partition_test_table"
self.client.execute(
'create table %s (x int) partitioned by (y int, z int)' %
table_name)
self.client.execute(
'alter table %s add partition (y=333, z=5309)' % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str('0')]
self.run_stmt_in_hive(
'insert into table %s partition (y=333, z=5309) values (2)'
% table_name)
# Make sure its still shows the same result before refreshing
result = self.client.execute("select count(*) from %s" % table_name)
valid_counts = [0]
if cluster_properties.is_event_polling_enabled():
# HMS notifications may pick up added partition racily.
valid_counts.append(1)
assert int(result.data[0]) in valid_counts
self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
assert '2\t333\t5309' == self.client.execute(
'select * from %s' % table_name).get_data()
def test_refresh_invalid_partition(self, vector, unique_database):
"""
Trying to refresh a partition that does not exist does not modify anything
either in impala or hive.
"""
table_name = unique_database + '.' + "partition_test_table"
self.client.execute(
'create table %s (x int) partitioned by (y int, z int)' %
table_name)
self.client.execute(
'alter table %s add partition (y=333, z=5309)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
self.client.execute('refresh %s partition (y=71, z=8857)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
def test_remove_data_and_refresh(self, vector, unique_database):
"""
Data removed through hive is visible in impala after refresh of partition.
"""
expected_error = 'Error(2): No such file or directory'
table_name = unique_database + '.' + "partition_test_table"
self.client.execute(
'create table %s (x int) partitioned by (y int, z int)' %
table_name)
self.client.execute(
'alter table %s add partition (y=333, z=5309)' % table_name)
self.client.execute(
'insert into table %s partition (y=333, z=5309) values (2)' % table_name)
assert '2\t333\t5309' == self.client.execute(
'select * from %s' % table_name).get_data()
self.run_stmt_in_hive(
'alter table %s drop partition (y=333, z=5309)' % table_name)
# Query the table. With file handle caching, this may not produce an error,
# because the file handles are still open in the cache. If the system does
# produce an error, it should be the expected error.
try:
self.client.execute("select * from %s" % table_name)
except ImpalaBeeswaxException as e:
assert expected_error in str(e)
self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str('0')]
def test_add_delete_data_to_hdfs_and_refresh(self, vector, unique_database):
"""
Data added/deleted directly in HDFS is visible in impala after refresh of
partition.
"""
table_name = unique_database + '.' + "partition_test_table"
table_location = get_fs_path("/test-warehouse/%s" % unique_database)
file_name = "alltypes.parq"
src_file = get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
"day=9/*.parq")
file_num_rows = 1000
self.client.execute("""
create table %s like functional.alltypes stored as parquet
location '%s'
""" % (table_name, table_location))
self.client.execute("alter table %s add partition (year=2010, month=1)" %
table_name)
self.client.execute("refresh %s" % table_name)
# Check that there is no data in table
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
dst_path = "%s/year=2010/month=1/%s" % (table_location, file_name)
self.filesystem_client.copy(src_file, dst_path, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Chech that data is visible after refresh
self.client.execute("refresh %s partition (year=2010, month=1)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows)]
# Check that after deleting the file and refreshing, it returns zero rows
check_call(["hadoop", "fs", "-rm", dst_path], shell=False)
self.client.execute("refresh %s partition (year=2010, month=1)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
def test_confirm_individual_refresh(self, vector, unique_database):
"""
Data added directly to HDFS is only visible for the partition refreshed
"""
table_name = unique_database + '.' + "partition_test_table"
table_location = get_fs_path("/test-warehouse/%s" % unique_database)
file_name = "alltypes.parq"
src_file = get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
"day=9/*.parq")
file_num_rows = 1000
self.client.execute("""
create table %s like functional.alltypes stored as parquet
location '%s'
""" % (table_name, table_location))
for month in [1, 2]:
self.client.execute("alter table %s add partition (year=2010, month=%s)" %
(table_name, month))
self.client.execute("refresh %s" % table_name)
# Check that there is no data in table
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
dst_path = table_location + "/year=2010/month=%s/" + file_name
for month in [1, 2]:
self.filesystem_client.copy(src_file, dst_path % month, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Check that data is visible after refresh on the first partition only
self.client.execute("refresh %s partition (year=2010, month=1)" %
table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows)]
# Check that the data is not yet visible for the second partition
# that was not refreshed
result = self.client.execute(
"select count(*) from %s where year=2010 and month=2" % table_name)
assert result.data == [str(0)]
# Check that data is visible for the second partition after refresh
self.client.execute("refresh %s partition (year=2010, month=2)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows*2)]