blob: 7ee7db3e518398aa7a503eeafcdf01502ae38314 [file]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
import requests
import time
from tests.common.impala_connection import ERROR, FINISHED
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
from tests.common.skip import SkipIfLocal, SkipIfFS
from tests.util.filesystem_utils import WAREHOUSE
@SkipIfLocal.hdfs_client
class TestRecursiveListing(ImpalaTestSuite):
"""
This class tests that files are recursively listed within directories
and partitions, and that REFRESH picks up changes within them.
"""
enable_fs_tracing_url = "http://localhost:25020/set_java_loglevel?" \
"class=org.apache.impala.common.FileSystemUtil&level=trace"
reset_log_level_url = "http://localhost:25020/reset_java_loglevel"
def _show_files(self, table):
files = self.client.execute("show files in {0}".format(table))
return files.data
def _get_rows(self, table):
result = self.client.execute("select * from {0}".format(table))
return result.data
def test_unpartitioned(self, unique_database):
self._do_test(unique_database, partitioned=False)
def test_partitioned(self, unique_database):
self._do_test(unique_database, partitioned=True)
def _init_test_table(self, unique_database, partitioned):
tbl_name = "t"
fq_tbl_name = unique_database + "." + tbl_name
tbl_path = '%s/%s.db/%s' % (WAREHOUSE, unique_database, tbl_name)
# Create the table
self.execute_query_expect_success(self.client,
("create table {tbl} (a string) {partclause} "
"stored as textfile location '{loc}'").format(
tbl=fq_tbl_name,
partclause=(partitioned and "partitioned by (p int)" or ""),
loc=tbl_path))
if partitioned:
self.execute_query_expect_success(self.client,
"alter table {0} add partition (p=1)".format(fq_tbl_name))
part_path = tbl_path + "/p=1"
else:
part_path = tbl_path
return fq_tbl_name, part_path
def _do_test(self, unique_database, partitioned):
fq_tbl_name, part_path = self._init_test_table(unique_database, partitioned)
# Add a file inside a nested directory and refresh.
self.filesystem_client.make_dir("{0}/dir1".format(part_path))
self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path), "file1")
self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 1
assert len(self._get_rows(fq_tbl_name)) == 1
# Add another file inside the same directory, make sure it shows up.
self.filesystem_client.create_file("{0}/dir1/file2.txt".format(part_path), "file2")
self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 2
assert len(self._get_rows(fq_tbl_name)) == 2
# Add a file at the top level, make sure it shows up.
self.filesystem_client.create_file("{0}/file3.txt".format(part_path), "file3")
self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 3
assert len(self._get_rows(fq_tbl_name)) == 3
# Create files in the nested hidden directories and refresh. Make sure it does not
# show up
dir_file_list = [
(".hive-staging", "file3.txt"),
("_tmp.base_000000_1", "000000_0.manifest"),
("-tmp.base_000000_1", "000000_0.manifest"),
("_impala_insert_staging", "bc4e15747fc7d788-f632b3b300000000_944410164_data.0.txt")
]
for (dir, file) in dir_file_list:
self.filesystem_client.make_dir("{0}/{1}".format(part_path, dir))
self.filesystem_client.create_file(
"{0}/{1}/{2}".format(part_path, dir, file), "shouldntreadthis")
self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 3
assert len(self._get_rows(fq_tbl_name)) == 3
# Test that disabling recursive listings makes the nested files disappear.
self.execute_query_expect_success(self.client, ("alter table {0} set tblproperties("
"'impala.disable.recursive.listing'='true')").format(fq_tbl_name))
self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 1
assert len(self._get_rows(fq_tbl_name)) == 1
# Also test initial metadata loading
self.execute_query_expect_success(
self.client, "invalidate metadata {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 1
assert len(self._get_rows(fq_tbl_name)) == 1
# Re-enable.
self.execute_query_expect_success(self.client, ("alter table {0} set tblproperties("
"'impala.disable.recursive.listing'='false')").format(fq_tbl_name))
self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 3
assert len(self._get_rows(fq_tbl_name)) == 3
# Remove the dir with two files. One should remain.
self.filesystem_client.delete_file_dir("{0}/dir1".format(part_path), recursive=True)
self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 1
assert len(self._get_rows(fq_tbl_name)) == 1
# Verify that TRUNCATE removes data files in subdirectories too.
# Regression test for IMPALA-13778.
self.filesystem_client.make_dir("{0}/dir1".format(part_path))
self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path), "file1")
self.execute_query_expect_success(self.client, "refresh {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 2
assert len(self._get_rows(fq_tbl_name)) == 2
self.execute_query_expect_success(self.client, "truncate {0}".format(fq_tbl_name))
assert len(self._show_files(fq_tbl_name)) == 0
assert len(self._get_rows(fq_tbl_name)) == 0
# Verify that INSERT OVERWRITE removes data files in subdirectories too.
# Regression test for IMPALA-13778.
self.filesystem_client.create_file("{0}/file1.txt".format(part_path), "file1")
self.filesystem_client.make_dir("{0}/dir1".format(part_path))
self.filesystem_client.create_file("{0}/dir1/file1.txt".format(part_path), "file1")
# Also add a hidden and an ignored dir.
self.filesystem_client.make_dir("{0}/_tmp.hiddendir".format(part_path))
self.filesystem_client.create_file(
"{0}/_tmp.hiddendir/file1.txt".format(part_path), "file1")
self.filesystem_client.make_dir("{0}/-tmp.ignoreddir".format(part_path))
self.filesystem_client.create_file(
"{0}/-tmp.ignoreddir/file1.txt".format(part_path), "file1")
self.execute_query_expect_success(self.client,
"insert overwrite {tbl} {part} select 'str'".format(
tbl=fq_tbl_name, part="partition(p=1)" if partitioned else ""))
assert len(self._show_files(fq_tbl_name)) == 1
assert len(self._get_rows(fq_tbl_name)) == 1
assert ((not partitioned)
== self.filesystem_client.exists("{0}/_tmp.hiddendir".format(part_path)))
assert ((not partitioned)
== self.filesystem_client.exists(
"{0}/_tmp.hiddendir/file1.txt".format(part_path)))
assert ((not partitioned)
== self.filesystem_client.exists("{0}/-tmp.ignoreddir".format(part_path)))
assert ((not partitioned)
== self.filesystem_client.exists(
"{0}/-tmp.ignoreddir/file1.txt".format(part_path)))
@SkipIfFS.no_partial_listing
@pytest.mark.execute_serially
def test_large_staging_dirs(self, unique_database):
"""Regression test for IMPALA-11464:
Test REFRESH survives with concurrent add/remove ops on large staging/tmp dirs
which contain more than 1000 files. Execute this serially since the sleep intervals
might not work with concurrent workloads. Test this only on HDFS since other FS might
not have partial listing (configured by dfs.ls.limit)."""
fq_tbl_name, part_path = self._init_test_table(unique_database, partitioned=True)
staging_dir = "{0}/.hive-staging".format(part_path)
# Expected timeline (before the fix of IMPALA-11464):
# 0ms: catalogd list the partition dir and wait 200ms.
# 200ms: catalogd get the staging dir and list it partially, then wait 200ms.
# 300ms: remove the staging dir.
# 400ms: catalogd consume the partial list (1000 items). Start listing the remaining
# items and get FileNotFoundException due to the dir is removed.
# After the fix of IMPALA-11464, catalogd won't list the staging dir, so avoids
# hitting the exception.
self._test_listing_large_dir(fq_tbl_name, large_dir=staging_dir,
pause_ms_after_remote_iterator_creation=200,
pause_ms_before_file_cleanup=300,
refresh_should_fail=False)
@SkipIfFS.no_partial_listing
@pytest.mark.execute_serially
def test_partition_dir_removed_inflight(self, unique_database):
"""Test REFRESH with concurrent add/remove ops on large partition dirs
which contain more than 1000 files. Execute this serially since the sleep
intervals might not work with concurrent workloads. Test this only on HDFS
since other FS might not have partial listing (configured by dfs.ls.limit)"""
fq_tbl_name, part_path = self._init_test_table(unique_database, partitioned=True)
# Expected timeline:
# 0ms: catalogd start listing the partition dir. Get 1000 items in the first
# partial listing. Then wait for 300ms.
# 200ms: The partition dir is removed.
# 300ms: catalogd processed the 1000 items and start listing the remaining items.
# Then get FileNotFoundException since the partition dir disappears.
self._test_listing_large_dir(fq_tbl_name, large_dir=part_path + '/',
pause_ms_after_remote_iterator_creation=300,
pause_ms_before_file_cleanup=200,
refresh_should_fail=True)
def _test_listing_large_dir(self, fq_tbl_name, large_dir,
pause_ms_after_remote_iterator_creation,
pause_ms_before_file_cleanup,
refresh_should_fail):
# We need data files more than 1000 (default of dfs.ls.limit) so the initial
# file listing can't list all of them.
files = [large_dir + '/' + str(i) for i in range(1024)]
refresh_stmt = "refresh " + fq_tbl_name
self.client.set_configuration({
"debug_action": "catalogd_pause_after_hdfs_remote_iterator_creation:SLEEP@"
+ str(pause_ms_after_remote_iterator_creation)
})
# Enable TRACE logging in FileSystemUtil for better debugging
response = requests.get(self.enable_fs_tracing_url)
assert response.status_code == requests.codes.ok
try:
self.filesystem_client.delete_file_dir(large_dir, recursive=True)
self.filesystem_client.make_dir(large_dir)
self.filesystem_client.touch(files)
LOG.info("created staging files under " + large_dir)
handle = self.execute_query_async(refresh_stmt)
# Wait a moment to let REFRESH finish expected partial listing on the dir.
time.sleep(pause_ms_before_file_cleanup / 1000.0)
LOG.info("removing staging dir " + large_dir)
self.filesystem_client.delete_file_dir(large_dir, recursive=True)
LOG.info("removed staging dir " + large_dir)
expected_state = ERROR if refresh_should_fail else FINISHED
self.client.wait_for_impala_state(handle, expected_state, 10)
finally:
requests.get(self.reset_log_level_url)