blob: 61ef7c820b0c7453d875df6ba84d8d4c58b705fb [file]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from subprocess import check_call
from tests.common.impala_connection import IMPALA_CONNECTION_EXCEPTION
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfFS
from tests.common.test_vector import HS2
from tests.util.filesystem_utils import get_fs_path
@SkipIfFS.hive
class TestRefreshPartition(ImpalaTestSuite):
"""
This class tests the functionality to refresh a partition individually
for a table in HDFS
"""
@classmethod
def default_test_protocol(cls):
return HS2
def test_refresh_invalid_partition(self, unique_database):
"""
Trying to refresh a partition that does not exist does not modify anything
either in impala or hive.
"""
table_name = unique_database + '.' + "partition_test_table"
self.client.execute(
'create table %s (x int) partitioned by (y int, z int)' %
table_name)
self.client.execute(
'alter table %s add partition (y=333, z=5309)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
self.client.execute('refresh %s partition (y=71, z=8857)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
self.client.execute(
'refresh %s partition (y=71, z=8857) partition (y=0, z=0)' % table_name)
assert [('333', '5309')] == self.get_impala_partition_info(table_name, 'y', 'z')
assert ['y=333/z=5309'] == self.hive_partition_names(table_name)
def test_remove_data_and_refresh(self, unique_database):
"""
Data removed through hive is visible in impala after refresh of partition.
"""
expected_error = 'Error(2): No such file or directory'
table_name = unique_database + '.' + "partition_test_table"
self.client.execute(
'create table %s (x int) partitioned by (y int, z int)' %
table_name)
self.client.execute(
'alter table %s add partition (y=333, z=5309)' % table_name)
self.client.execute(
'insert into table %s partition (y=333, z=5309) values (2)' % table_name)
assert '2\t333\t5309' == self.client.execute(
'select * from %s' % table_name).get_data()
self.run_stmt_in_hive(
'alter table %s drop partition (y=333, z=5309)' % table_name)
# Query the table. With file handle caching, this may not produce an error,
# because the file handles are still open in the cache. If the system does
# produce an error, it should be the expected error.
try:
self.client.execute("select * from %s" % table_name)
except IMPALA_CONNECTION_EXCEPTION as e:
assert expected_error in str(e)
self.client.execute('refresh %s partition (y=333, z=5309)' % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str('0')]
# Test multiple partitions
self.client.execute(
'insert into table %s partition (y, z) values '
'(2, 33, 444), (3, 44, 555), (4, 55, 666)' % table_name)
result = self.client.execute('select * from %s' % table_name)
assert '2\t33\t444' in result.data
assert '3\t44\t555' in result.data
assert '4\t55\t666' in result.data
assert len(result.data) == 3
# Drop two partitions in Hive
self.run_stmt_in_hive(
'alter table %s drop partition (y>33)' % table_name)
# Query the table. With file handle caching, this may not produce an error,
# because the file handles are still open in the cache. If the system does
# produce an error, it should be the expected error.
try:
self.client.execute("select * from %s" % table_name)
except IMPALA_CONNECTION_EXCEPTION as e:
assert expected_error in str(e)
self.client.execute(
'refresh %s partition (y=33, z=444) partition (y=44, z=555) '
'partition (y=55, z=666)' % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == ['1']
def test_add_delete_data_to_hdfs_and_refresh(self, unique_database):
"""
Data added/deleted directly in HDFS is visible in impala after refresh of
partition.
"""
table_name = unique_database + '.' + "partition_test_table"
table_location = get_fs_path("/test-warehouse/%s" % unique_database)
file_name = "alltypes.parq"
src_file = get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
"day=9/*.parq")
file_num_rows = 1000
self.client.execute("""
create table %s like functional.alltypes stored as parquet
location '%s'
""" % (table_name, table_location))
for month in range(1, 5):
self.client.execute("alter table %s add partition (year=2010, month=%d)" %
(table_name, month))
self.client.execute("refresh %s" % table_name)
# Check that there is no data in table
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
dst_path = "%s/year=2010/month=1/%s" % (table_location, file_name)
self.filesystem_client.copy(src_file, dst_path, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Chech that data is visible after refresh
self.client.execute("refresh %s partition (year=2010, month=1)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows)]
# Check that after deleting the file and refreshing, it returns zero rows
check_call(["hadoop", "fs", "-rm", dst_path], shell=False)
self.client.execute("refresh %s partition (year=2010, month=1)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Test multiple partitions
for month in range(2, 5):
dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, file_name)
self.filesystem_client.copy(src_file, dst_path, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == ['0']
# Chech that data is visible after refresh
self.client.execute(
"refresh %s partition (year=2010, month=2) partition (year=2010, month=3) "
"partition (year=2010, month=4)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows * 3)]
# Check that after deleting the file and refreshing, it returns zero rows
for month in range(2, 5):
dst_path = "%s/year=2010/month=%d/%s" % (table_location, month, file_name)
check_call(["hadoop", "fs", "-rm", dst_path], shell=False)
self.client.execute(
"refresh %s partition (year=2010, month=2) partition (year=2010, month=3) "
"partition (year=2010, month=4)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == ['0']
def test_confirm_individual_refresh(self, unique_database):
"""
Data added directly to HDFS is only visible for the partition refreshed
"""
table_name = unique_database + '.' + "partition_test_table"
table_location = get_fs_path("/test-warehouse/%s" % unique_database)
file_name = "alltypes.parq"
src_file = get_fs_path("/test-warehouse/alltypesagg_parquet/year=2010/month=1/"
"day=9/*.parq")
file_num_rows = 1000
self.client.execute("""
create table %s like functional.alltypes stored as parquet
location '%s'
""" % (table_name, table_location))
for month in range(1, 6):
self.client.execute("alter table %s add partition (year=2010, month=%s)" %
(table_name, month))
self.client.execute("refresh %s" % table_name)
# Check that there is no data in table
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
dst_path = table_location + "/year=2010/month=%s/" + file_name
for month in range(1, 6):
self.filesystem_client.copy(src_file, dst_path % month, overwrite=True)
# Check that data added is not visible before refresh
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(0)]
# Check that data is visible after refresh on the first partition only
self.client.execute("refresh %s partition (year=2010, month=1)" %
table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows)]
# Check that the data is not yet visible for the second partition
# that was not refreshed
result = self.client.execute(
"select count(*) from %s where year=2010 and month=2" % table_name)
assert result.data == [str(0)]
# Check that data is visible for the second partition after refresh
self.client.execute("refresh %s partition (year=2010, month=2)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows * 2)]
# Refresh multiple partitions
self.client.execute(
"refresh %s partition (year=2010, month=3) partition (year=2010, month=4) "
"partition (year=2010, month=5)" % table_name)
result = self.client.execute("select count(*) from %s" % table_name)
assert result.data == [str(file_num_rows * 5)]