| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import getpass |
| import grp |
| import pwd |
| import pytest |
| import re |
| |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.parametrize import UniqueDatabase |
| from tests.common.skip import (SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, |
| SkipIfLocal, SkipIfDockerizedCluster, SkipIfCatalogV2) |
| from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_S3 |
| |
| @SkipIfLocal.hdfs_client |
| class TestInsertBehaviour(ImpalaTestSuite): |
| """Tests for INSERT behaviour that isn't covered by checking query results""" |
| @classmethod |
| def get_workload(self): |
| return 'functional-query' |
| |
| TEST_DB_NAME = "insert_empty_result_db" |
| |
| def setup_method(self, method): |
| # cleanup and create a fresh test database |
| if method.__name__ == "test_insert_select_with_empty_resultset": |
| self.cleanup_db(self.TEST_DB_NAME) |
| self.execute_query("create database if not exists {0} location " |
| "'{1}/{0}.db'".format(self.TEST_DB_NAME, WAREHOUSE)) |
| |
| def teardown_method(self, method): |
| if method.__name__ == "test_insert_select_with_empty_resultset": |
| self.cleanup_db(self.TEST_DB_NAME) |
| |
| @SkipIfADLS.eventually_consistent |
| @pytest.mark.execute_serially |
| def test_insert_removes_staging_files(self): |
| TBL_NAME = "insert_overwrite_nopart" |
| insert_staging_dir = ("test-warehouse/functional.db/%s/" |
| "_impala_insert_staging" % TBL_NAME) |
| self.filesystem_client.delete_file_dir(insert_staging_dir, recursive=True) |
| self.client.execute("INSERT OVERWRITE functional.%s " |
| "SELECT int_col FROM functional.tinyinttable" % TBL_NAME) |
| ls = self.filesystem_client.ls(insert_staging_dir) |
| assert len(ls) == 0 |
| |
| @pytest.mark.execute_serially |
| def test_insert_preserves_hidden_files(self): |
| """Test that INSERT OVERWRITE preserves hidden files in the root table directory""" |
| TBL_NAME = "insert_overwrite_nopart" |
| table_dir = "test-warehouse/functional.db/%s/" % TBL_NAME |
| hidden_file_locations = [".hidden", "_hidden"] |
| dir_locations = ["dir", ".hidden_dir"] |
| |
| for dir_ in dir_locations: |
| self.filesystem_client.make_dir(table_dir + dir_) |
| |
| # We do this here because the above 'make_dir' call doesn't make a directory for S3. |
| for dir_ in dir_locations: |
| self.filesystem_client.create_file( |
| table_dir + dir_ + '/' + hidden_file_locations[0] , '', overwrite=True) |
| |
| for file_ in hidden_file_locations: |
| self.filesystem_client.create_file(table_dir + file_, '', overwrite=True) |
| |
| self.client.execute("INSERT OVERWRITE functional.%s" |
| " SELECT int_col FROM functional.tinyinttable" % TBL_NAME) |
| |
| for file_ in hidden_file_locations: |
| assert self.filesystem_client.exists(table_dir + file_), "Hidden file {0} was " \ |
| "unexpectedly deleted by INSERT OVERWRITE".format(table_dir + file_) |
| |
| for dir_ in dir_locations: |
| assert self.filesystem_client.exists(table_dir + dir_), "Directory {0} was " \ |
| "unexpectedly deleted by INSERT OVERWRITE".format(table_dir + dir_) |
| |
| def test_insert_ascii_nulls(self, unique_database): |
| TBL_NAME = '`{0}`.`null_insert`'.format(unique_database) |
| self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS %s" % TBL_NAME) |
| self.execute_query_expect_success(self.client, "create table %s as select '\0' s" |
| % TBL_NAME) |
| result = self.execute_query_expect_success(self.client, |
| "SELECT LENGTH(s) FROM %s" % TBL_NAME) |
| assert int(result.get_data()) == 1 |
| |
| @UniqueDatabase.parametrize(name_prefix='test_insert_alter_partition_location_db') |
| def test_insert_alter_partition_location(self, unique_database): |
| """Test that inserts after changing the location of a partition work correctly, |
| including the creation of a non-existant partition dir""" |
| part_dir = "tmp/{0}".format(unique_database) |
| qualified_part_dir = get_fs_path('/' + part_dir) |
| table_name = "`{0}`.`insert_alter_partition_location`".format(unique_database) |
| |
| self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS %s" % table_name) |
| self.filesystem_client.delete_file_dir(part_dir, recursive=True) |
| |
| self.execute_query_expect_success( |
| self.client, |
| "CREATE TABLE %s (c int) PARTITIONED BY (p int)" % table_name) |
| self.execute_query_expect_success( |
| self.client, |
| "ALTER TABLE %s ADD PARTITION(p=1)" % table_name) |
| self.execute_query_expect_success( |
| self.client, |
| "ALTER TABLE %s PARTITION(p=1) SET LOCATION '%s'" % (table_name, |
| qualified_part_dir)) |
| self.execute_query_expect_success( |
| self.client, |
| "INSERT OVERWRITE %s PARTITION(p=1) VALUES(1)" % table_name) |
| |
| result = self.execute_query_expect_success( |
| self.client, |
| "SELECT COUNT(*) FROM %s" % table_name) |
| assert int(result.get_data()) == 1 |
| |
| # Should have created the partition dir, which should contain exactly one file (not in |
| # a subdirectory) |
| assert len(self.filesystem_client.ls(part_dir)) == 1 |
| |
| @SkipIfS3.hdfs_acls |
| @SkipIfABFS.hdfs_acls |
| @SkipIfADLS.hdfs_acls |
| @SkipIfIsilon.hdfs_acls |
| @pytest.mark.xfail(run=False, reason="Fails intermittently on test clusters") |
| @pytest.mark.execute_serially |
| def test_insert_inherit_acls(self): |
| """Check that ACLs are inherited when we create new partitions""" |
| |
| ROOT_ACL = "default:group:dummy_group:rwx" |
| TEST_ACL = "default:group:impala_test_users:r-x" |
| |
| def check_has_acls(part, acl=TEST_ACL): |
| path = "test-warehouse/functional.db/insert_inherit_acls/" + part |
| result = self.hdfs_client.getacl(path) |
| assert acl in result['AclStatus']['entries'] |
| |
| # Add a spurious ACL to functional.db directory |
| self.hdfs_client.setacl("test-warehouse/functional.db", ROOT_ACL) |
| |
| self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS" |
| " functional.insert_inherit_acls") |
| self.execute_query_expect_success(self.client, "CREATE TABLE " |
| "functional.insert_inherit_acls (col int)" |
| " PARTITIONED BY (p1 int, p2 int, p3 int)") |
| |
| # Check that table creation inherited the ACL |
| check_has_acls("", ROOT_ACL) |
| |
| self.execute_query_expect_success(self.client, "ALTER TABLE " |
| "functional.insert_inherit_acls ADD PARTITION" |
| "(p1=1, p2=1, p3=1)") |
| |
| check_has_acls("p1=1", ROOT_ACL) |
| check_has_acls("p1=1/p2=1", ROOT_ACL) |
| check_has_acls("p1=1/p2=1/p3=1", ROOT_ACL) |
| |
| self.hdfs_client.setacl( |
| "test-warehouse/functional.db/insert_inherit_acls/p1=1/", TEST_ACL) |
| |
| self.execute_query_expect_success(self.client, "INSERT INTO " |
| "functional.insert_inherit_acls " |
| "PARTITION(p1=1, p2=2, p3=2) VALUES(1)") |
| |
| check_has_acls("p1=1/p2=2/") |
| check_has_acls("p1=1/p2=2/p3=2") |
| |
| # Check that SETACL didn't cascade down to children (which is more to do with HDFS |
| # than Impala, but worth asserting here) |
| check_has_acls("p1=1/p2=1", ROOT_ACL) |
| check_has_acls("p1=1/p2=1/p3=1", ROOT_ACL) |
| |
| # Change ACLs on p1=1,p2=2 and create a new leaf at p3=30 |
| self.hdfs_client.setacl( |
| "test-warehouse/functional.db/insert_inherit_acls/p1=1/p2=2/", |
| "default:group:new_leaf_group:-w-") |
| |
| self.execute_query_expect_success(self.client, "INSERT INTO " |
| "functional.insert_inherit_acls " |
| "PARTITION(p1=1, p2=2, p3=30) VALUES(1)") |
| check_has_acls("p1=1/p2=2/p3=30", "default:group:new_leaf_group:-w-") |
| |
| @SkipIfS3.hdfs_acls |
| @SkipIfABFS.hdfs_acls |
| @SkipIfADLS.hdfs_acls |
| @SkipIfIsilon.hdfs_acls |
| @SkipIfCatalogV2.impala_7539() |
| def test_insert_file_permissions(self, unique_database): |
| """Test that INSERT correctly respects file permission (minimum ACLs)""" |
| table = "`{0}`.`insert_acl_permissions`".format(unique_database) |
| table_path = "test-warehouse/{0}.db/insert_acl_permissions".format(unique_database) |
| |
| insert_query = "INSERT INTO {0} VALUES(1)".format(table) |
| refresh_query = "REFRESH {0}".format(table) |
| |
| self.execute_query_expect_success(self.client, |
| "DROP TABLE IF EXISTS {0}".format(table)) |
| self.execute_query_expect_success( |
| self.client, "CREATE TABLE {0} (col int)".format(table)) |
| |
| # Check that a simple insert works |
| self.execute_query_expect_success(self.client, insert_query) |
| |
| # Remove the permission to write and confirm that INSERTs won't work |
| self.hdfs_client.setacl(table_path, "user::r-x,group::r-x,other::r-x") |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| # Now add group access, still should fail (because the user will match and take |
| # priority) |
| self.hdfs_client.setacl(table_path, "user::r-x,group::rwx,other::r-x") |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| # Now make the target directory non-writable with posix permissions, but writable with |
| # ACLs (ACLs should take priority). Note: chmod affects ACLs (!) so it has to be done |
| # first. |
| self.hdfs_client.chmod(table_path, "000") |
| self.hdfs_client.setacl(table_path, "user::rwx,group::r-x,other::r-x") |
| |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_success(self.client, insert_query) |
| |
| # Finally, change the owner |
| self.hdfs_client.chown(table_path, "another_user", "another_group") |
| self.execute_query_expect_success(self.client, refresh_query) |
| # Should be unwritable because 'other' ACLs don't allow writes |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| # Give write perms back to 'other' |
| self.hdfs_client.setacl(table_path, "user::rwx,group::r-x,other::rwx") |
| self.execute_query_expect_success(self.client, refresh_query) |
| # Should be writable because 'other' ACLs allow writes |
| self.execute_query_expect_success(self.client, insert_query) |
| |
| @SkipIfS3.hdfs_acls |
| @SkipIfABFS.hdfs_acls |
| @SkipIfADLS.hdfs_acls |
| @SkipIfIsilon.hdfs_acls |
| @SkipIfCatalogV2.impala_7539() |
| def test_mixed_partition_permissions(self, unique_database): |
| """ |
| Test that INSERT and LOAD DATA into explicit partitions is allowed even |
| if some partitions exist without appropriate permissions. |
| """ |
| table = "`{0}`.`insert_partition_perms`".format(unique_database) |
| table_path = "test-warehouse/{0}.db/insert_partition_perms".format(unique_database) |
| |
| def partition_path(partition): |
| return "%s/p=%s" % (table_path, partition) |
| |
| def insert_query(partition): |
| return "INSERT INTO {0} PARTITION (p='{1}') VALUES(1)".format(table, partition) |
| |
| def load_data(query_fn, partition): |
| path = "tmp/{0}/data_file".format(unique_database) |
| self.hdfs_client.delete_file_dir(path) |
| self.hdfs_client.create_file(path, "1") |
| q = "LOAD DATA INPATH '/{0}' INTO TABLE {1} PARTITION (p='{2}')".format( |
| path, table, partition) |
| return query_fn(self.client, q) |
| |
| insert_dynamic_partition = \ |
| "INSERT INTO {0} (col, p) SELECT col, p from {0}".format(table) |
| |
| insert_dynamic_partition_2 = \ |
| "INSERT INTO {0} PARTITION(p) SELECT col, p from {0}".format(table) |
| |
| # TODO(todd): REFRESH does not seem to refresh the permissions of existing |
| # partitions -- need to fully reload the metadata to do so. |
| invalidate_query = "INVALIDATE METADATA {0}".format(table) |
| |
| self.execute_query_expect_success(self.client, |
| "DROP TABLE IF EXISTS {0}".format(table)) |
| self.execute_query_expect_success( |
| self.client, "CREATE TABLE {0} (col int) PARTITIONED BY (p string)".format(table)) |
| |
| # Check that a simple insert works |
| self.execute_query_expect_success(self.client, insert_query("initial_part")) |
| |
| # Add a partition with no write permissions |
| self.execute_query_expect_success( |
| self.client, |
| "ALTER TABLE {0} ADD PARTITION (p=\"no_write\")".format(table)) |
| self.hdfs_client.chown(partition_path("no_write"), "another_user", "another_group") |
| self.execute_query_expect_success(self.client, invalidate_query) |
| |
| # Check that inserts into the new partition do not work. |
| err = self.execute_query_expect_failure(self.client, insert_query("no_write")) |
| assert re.search(r'Impala does not have WRITE access.*p=no_write', str(err)) |
| |
| err = load_data(self.execute_query_expect_failure, "no_write") |
| assert re.search(r'Impala does not have WRITE access.*p=no_write', str(err)) |
| |
| # Inserts into the existing partition should still work |
| self.execute_query_expect_success(self.client, insert_query("initial_part")) |
| load_data(self.execute_query_expect_success, "initial_part") |
| |
| # Inserts into newly created partitions should work. |
| self.execute_query_expect_success(self.client, insert_query("added_part")) |
| |
| # TODO(IMPALA-7313): loading data cannot currently add partitions, so this |
| # is commented out. If that behavior is changed, uncomment this. |
| # load_data(self.execute_query_expect_success, "added_part_load") |
| |
| # Inserts into dynamic partitions should fail. |
| err = self.execute_query_expect_failure(self.client, insert_dynamic_partition) |
| assert re.search(r'Impala does not have WRITE access.*p=no_write', str(err)) |
| err = self.execute_query_expect_failure(self.client, insert_dynamic_partition_2) |
| assert re.search(r'Impala does not have WRITE access.*p=no_write', str(err)) |
| |
| # If we make the top-level table directory read-only, we should still be able to |
| # insert into existing writable partitions. |
| self.hdfs_client.chown(table_path, "another_user", "another_group") |
| self.execute_query_expect_success(self.client, insert_query("added_part")) |
| load_data(self.execute_query_expect_success, "added_part") |
| |
| @SkipIfS3.hdfs_acls |
| @SkipIfABFS.hdfs_acls |
| @SkipIfADLS.hdfs_acls |
| @SkipIfIsilon.hdfs_acls |
| @SkipIfCatalogV2.impala_7539() |
| def test_readonly_table_dir(self, unique_database): |
| """ |
| Test that, if a partitioned table has a read-only base directory, |
| the frontend rejects queries that may attempt to create new |
| partitions. |
| """ |
| table = "`{0}`.`insert_partition_perms`".format(unique_database) |
| table_path = "test-warehouse/{0}.db/insert_partition_perms".format(unique_database) |
| insert_dynamic_partition = \ |
| "INSERT INTO {0} (col, p) SELECT col, p from {0}".format(table) |
| |
| def insert_query(partition): |
| return "INSERT INTO {0} PARTITION (p='{1}') VALUES(1)".format(table, partition) |
| |
| self.execute_query_expect_success(self.client, |
| "DROP TABLE IF EXISTS {0}".format(table)) |
| self.execute_query_expect_success( |
| self.client, "CREATE TABLE {0} (col int) PARTITIONED BY (p string)".format(table)) |
| |
| # If we make the top-level table directory read-only, we should not be able |
| # to create new partitions, either explicitly, or dynamically. |
| self.hdfs_client.chown(table_path, "another_user", "another_group") |
| self.execute_query_expect_success(self.client, "INVALIDATE METADATA %s" % table) |
| err = self.execute_query_expect_failure(self.client, insert_dynamic_partition) |
| assert re.search(r'Impala does not have WRITE access.*' + table_path, str(err)) |
| err = self.execute_query_expect_failure(self.client, insert_query("some_new_part")) |
| assert re.search(r'Impala does not have WRITE access.*' + table_path, str(err)) |
| |
| @SkipIfS3.hdfs_acls |
| @SkipIfABFS.hdfs_acls |
| @SkipIfADLS.hdfs_acls |
| @SkipIfIsilon.hdfs_acls |
| @SkipIfDockerizedCluster.insert_acls |
| @SkipIfCatalogV2.impala_7539() |
| def test_insert_acl_permissions(self, unique_database): |
| """Test that INSERT correctly respects ACLs""" |
| table = "`{0}`.`insert_acl_permissions`".format(unique_database) |
| table_path = "test-warehouse/{0}.db/insert_acl_permissions".format(unique_database) |
| |
| insert_query = "INSERT INTO %s VALUES(1)" % table |
| refresh_query = "REFRESH {0}".format(table) |
| |
| self.execute_query_expect_success(self.client, |
| "DROP TABLE IF EXISTS {0}".format(table)) |
| self.execute_query_expect_success(self.client, |
| "CREATE TABLE {0} (col int)".format(table)) |
| |
| # Check that a simple insert works |
| self.execute_query_expect_success(self.client, insert_query) |
| |
| user = getpass.getuser() |
| group = grp.getgrgid(pwd.getpwnam(user).pw_gid).gr_name |
| # First, change the owner to someone other than user who runs impala service |
| self.hdfs_client.chown(table_path, "another_user", group) |
| |
| # Remove the permission to write and confirm that INSERTs won't work |
| self.hdfs_client.setacl(table_path, |
| "user::r-x,user:" + user + ":r-x,group::r-x,other::r-x") |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| # Add the permission to write. if we're not the owner of the file, INSERTs should work |
| self.hdfs_client.setacl(table_path, |
| "user::r-x,user:" + user + ":rwx,group::r-x,other::r-x") |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_success(self.client, insert_query) |
| |
| # Now add group access, still should fail (because the user will match and take |
| # priority) |
| self.hdfs_client.setacl(table_path, |
| "user::r-x,user:" + user + ":r-x,group::rwx,other::r-x") |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| # Check that the mask correctly applies to the anonymous group ACL |
| self.hdfs_client.setacl(table_path, "user::r-x,group::rwx,other::rwx,mask::r--") |
| self.execute_query_expect_success(self.client, refresh_query) |
| # Should be unwritable because mask applies to unnamed group and disables writing |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| # Check that the mask correctly applies to the named user ACL |
| self.hdfs_client.setacl(table_path, "user::r-x,user:" + user + |
| ":rwx,group::r-x,other::rwx,mask::r--") |
| self.execute_query_expect_success(self.client, refresh_query) |
| # Should be unwritable because mask applies to named user and disables writing |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| # Now make the target directory non-writable with posix permissions, but writable with |
| # ACLs (ACLs should take priority). Note: chmod affects ACLs (!) so it has to be done |
| # first. |
| self.hdfs_client.chmod(table_path, "000") |
| self.hdfs_client.setacl(table_path, "user::rwx,user:foo:rwx,group::rwx,other::r-x") |
| |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_success(self.client, insert_query) |
| |
| # Finally, change the owner/group |
| self.hdfs_client.chown(table_path, "test_user", "invalid") |
| |
| self.execute_query_expect_success(self.client, refresh_query) |
| # Should be unwritable because 'other' ACLs don't allow writes |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| # Give write perms back to 'other' |
| self.hdfs_client.setacl(table_path, "user::r-x,user:foo:rwx,group::r-x,other::rwx") |
| self.execute_query_expect_success(self.client, refresh_query) |
| # Should be writable because 'other' ACLs allow writes |
| self.execute_query_expect_success(self.client, insert_query) |
| |
| @SkipIfS3.hdfs_acls |
| @SkipIfABFS.hdfs_acls |
| @SkipIfADLS.hdfs_acls |
| @SkipIfIsilon.hdfs_acls |
| @SkipIfCatalogV2.impala_7539() |
| def test_load_permissions(self, unique_database): |
| # We rely on test_insert_acl_permissions() to exhaustively check that ACL semantics |
| # are correct. Here we just validate that LOADs can't be done when we cannot read from |
| # or write to the src directory, or write to the dest directory. |
| |
| table = "`{0}`.`load_acl_permissions`".format(unique_database) |
| table_path = "test-warehouse/{0}.db/load_acl_permissions".format(unique_database) |
| |
| file_path = "tmp/{0}".format(unique_database) |
| file_name = "%s/impala_data_file" % file_path |
| |
| load_file_query = "LOAD DATA INPATH '/%s' INTO TABLE %s" % (file_name, table) |
| load_dir_query = "LOAD DATA INPATH '/%s' INTO TABLE %s" % (file_path, table) |
| |
| refresh_query = "REFRESH {0}".format(table) |
| |
| self.hdfs_client.make_dir(file_path) |
| |
| self.hdfs_client.setacl(file_path, "user::rwx,group::rwx,other::---") |
| self.execute_query_expect_success(self.client, |
| "DROP TABLE IF EXISTS {0}".format(table)) |
| self.execute_query_expect_success( |
| self.client, "CREATE TABLE {0} (col int)".format(table)) |
| self.hdfs_client.delete_file_dir(file_name) |
| self.hdfs_client.create_file(file_name, "1") |
| |
| self.execute_query_expect_success(self.client, load_file_query) |
| |
| # Now remove write perms from the source directory |
| self.hdfs_client.create_file(file_name, "1") |
| self.hdfs_client.setacl(file_path, "user::---,group::---,other::---") |
| self.hdfs_client.setacl(table_path, "user::rwx,group::r-x,other::r-x") |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_failure(self.client, load_file_query) |
| self.execute_query_expect_failure(self.client, load_dir_query) |
| |
| # Remove write perms from target |
| self.hdfs_client.setacl(file_path, "user::rwx,group::rwx,other::rwx") |
| self.hdfs_client.setacl(table_path, "user::r-x,group::r-x,other::r-x") |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_failure(self.client, load_file_query) |
| self.execute_query_expect_failure(self.client, load_dir_query) |
| |
| # Finally remove read perms from file itself |
| self.hdfs_client.setacl(file_name, "user::-wx,group::rwx,other::rwx") |
| self.hdfs_client.setacl(table_path, "user::rwx,group::rwx,other::rwx") |
| self.execute_query_expect_success(self.client, refresh_query) |
| self.execute_query_expect_failure(self.client, load_file_query) |
| # We expect this to succeed, it's not an error if all files in the dir cannot be read |
| self.execute_query_expect_success(self.client, load_dir_query) |
| |
| @SkipIfADLS.eventually_consistent |
| @pytest.mark.execute_serially |
| def test_insert_select_with_empty_resultset(self): |
| """Test insert/select query won't trigger partition directory or zero size data file |
| creation if the resultset of select is empty.""" |
| def check_path_exists(path, should_exist): |
| assert self.filesystem_client.exists(path) == should_exist, "file/dir '{0}' " \ |
| "should {1}exist but does {2}exist.".format( |
| path, '' if should_exist else 'not ', 'not ' if should_exist else '') |
| |
| db_path = "test-warehouse/%s.db/" % self.TEST_DB_NAME |
| table_path = db_path + "test_insert_empty_result" |
| partition_path = "{0}/year=2009/month=1".format(table_path) |
| check_path_exists(table_path, False) |
| |
| table_name = self.TEST_DB_NAME + ".test_insert_empty_result" |
| self.execute_query_expect_success( |
| self.client, |
| "CREATE TABLE %s (id INT, col INT) PARTITIONED BY (year INT, month " |
| "INT)" % table_name) |
| check_path_exists(table_path, True) |
| check_path_exists(partition_path, False) |
| |
| # Run an insert/select stmt that returns an empty resultset. |
| insert_query = ("INSERT INTO TABLE {0} PARTITION(year=2009, month=1)" |
| "select 1, 1 from {0} LIMIT 0".format(table_name)) |
| self.execute_query_expect_success(self.client, insert_query) |
| # Partition directory is created |
| check_path_exists(partition_path, True) |
| |
| # Insert one record |
| insert_query_one_row = ("INSERT INTO TABLE %s PARTITION(year=2009, month=1) " |
| "values(2, 2)" % table_name) |
| self.execute_query_expect_success(self.client, insert_query_one_row) |
| # Partition directory should be created with one data file |
| check_path_exists(partition_path, True) |
| ls = (self.filesystem_client.ls(partition_path)) |
| assert len(ls) == 1 |
| |
| # Run an insert/select statement that returns an empty resultset again |
| self.execute_query_expect_success(self.client, insert_query) |
| # No new data file should be created |
| new_ls = self.filesystem_client.ls(partition_path) |
| assert len(new_ls) == 1 |
| assert new_ls == ls |
| |
| # Run an insert overwrite/select that returns an empty resultset |
| insert_query = ("INSERT OVERWRITE {0} PARTITION(year=2009, month=1)" |
| " select 1, 1 from {0} LIMIT 0".format(table_name)) |
| self.execute_query_expect_success(self.client, insert_query) |
| # Data file should be deleted |
| new_ls2 = self.filesystem_client.ls(partition_path) |
| assert len(new_ls2) == 0 |
| assert new_ls != new_ls2 |
| |
| # Test for IMPALA-2008 insert overwrite to an empty table with empty dataset |
| empty_target_tbl = "test_overwrite_with_empty_target" |
| create_table = "create table %s.%s (id INT, col INT)" % (self.TEST_DB_NAME, |
| empty_target_tbl) |
| self.execute_query_expect_success(self.client, create_table) |
| insert_query = ("INSERT OVERWRITE {0}.{1} select 1, 1 from {0}.{1} LIMIT 0" |
| .format(self.TEST_DB_NAME, empty_target_tbl)) |
| self.execute_query_expect_success(self.client, insert_query) |
| |
| # Delete target table directory, query should fail with |
| # "No such file or directory" error |
| target_table_path = "%s%s" % (db_path, empty_target_tbl) |
| self.filesystem_client.delete_file_dir(target_table_path, recursive=True) |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| @SkipIfS3.hdfs_acls |
| @SkipIfABFS.hdfs_acls |
| @SkipIfADLS.hdfs_acls |
| @SkipIfIsilon.hdfs_acls |
| @SkipIfDockerizedCluster.insert_acls |
| @SkipIfCatalogV2.impala_7539() |
| def test_multiple_group_acls(self, unique_database): |
| """Test that INSERT correctly respects multiple group ACLs""" |
| table = "`{0}`.`insert_group_acl_permissions`".format(unique_database) |
| table_path = "test-warehouse/{0}.db/insert_group_acl_permissions".format( |
| unique_database) |
| insert_query = "INSERT INTO %s VALUES(1)" % table |
| |
| self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS " + table) |
| self.execute_query_expect_success(self.client, "CREATE TABLE %s (col int)" % table) |
| |
| user = getpass.getuser() |
| test_user = "test_user" |
| # Get the list of all groups of user except the user's owning group. |
| owning_group = grp.getgrgid(pwd.getpwnam(user).pw_gid).gr_name |
| groups = [g.gr_name for g in grp.getgrall() if user in g.gr_mem] |
| if (len(groups) < 1): |
| pytest.xfail(reason="Cannot run test, user belongs to only one group.") |
| |
| # First, change the owner to someone other than user who runs impala service |
| self.hdfs_client.chown(table_path, "another_user", owning_group) |
| |
| # Set two group ACLs, one contains requested permission, the other doesn't. |
| self.hdfs_client.setacl( |
| table_path, |
| "user::r-x,user:{0}:r-x,group::---,group:{1}:rwx," |
| "other::r-x".format(test_user, groups[0])) |
| self.execute_query_expect_success(self.client, "REFRESH " + table) |
| self.execute_query_expect_success(self.client, insert_query) |
| |
| # Two group ACLs but with mask to deny the permission. |
| self.hdfs_client.setacl( |
| table_path, |
| "user::r-x,group::r--,group:{0}:rwx,mask::r-x," |
| "other::---".format(groups[0])) |
| self.execute_query_expect_success(self.client, "REFRESH " + table) |
| self.execute_query_expect_failure(self.client, insert_query) |
| |
| def test_clustered_partition_single_file(self, unique_database): |
| """IMPALA-2523: Tests that clustered insert creates one file per partition, even when |
| inserting over multiple row batches.""" |
| # On s3 this test takes about 220 seconds and we are unlikely to break it, so only run |
| # it in exhaustive strategy. |
| if self.exploration_strategy() != 'exhaustive' and IS_S3: |
| pytest.skip("only runs in exhaustive") |
| table = "{0}.insert_clustered".format(unique_database) |
| table_path = "test-warehouse/{0}.db/insert_clustered".format(unique_database) |
| table_location = get_fs_path("/" + table_path) |
| |
| create_stmt = """create table {0} like functional.alltypes""".format(table) |
| self.execute_query_expect_success(self.client, create_stmt) |
| |
| set_location_stmt = """alter table {0} set location '{1}'""".format( |
| table, table_location) |
| self.execute_query_expect_success(self.client, set_location_stmt) |
| |
| # Setting a lower batch size will result in multiple row batches being written. |
| self.execute_query_expect_success(self.client, "set batch_size=10") |
| |
| insert_stmt = """insert into {0} partition(year, month) /*+ clustered,shuffle */ |
| select * from functional.alltypes""".format(table) |
| self.execute_query_expect_success(self.client, insert_stmt) |
| |
| # We expect exactly one partition per year and month, since subsequent row batches of |
| # a partition will be written into the same file. |
| expected_partitions = \ |
| ["year=%s/month=%s" % (y, m) for y in [2009, 2010] for m in range(1,13)] |
| |
| for partition in expected_partitions: |
| partition_path = "{0}/{1}".format(table_path, partition) |
| files = self.filesystem_client.ls(partition_path) |
| assert len(files) == 1, "%s: %s" % (partition, files) |
| |
| def test_clustered_partition_multiple_files(self, unique_database): |
| """IMPALA-2523: Tests that clustered insert creates the right number of files per |
| partition when inserting over multiple row batches.""" |
| # This test takes about 30 seconds and we are unlikely to break it, so only run it in |
| # exhaustive strategy. |
| if self.exploration_strategy() != 'exhaustive': |
| pytest.skip("only runs in exhaustive") |
| table = "{0}.insert_clustered".format(unique_database) |
| table_path = "test-warehouse/{0}.db/insert_clustered".format(unique_database) |
| table_location = get_fs_path("/" + table_path) |
| |
| create_stmt = """create table {0} ( |
| l_orderkey BIGINT, |
| l_partkey BIGINT, |
| l_suppkey BIGINT, |
| l_linenumber INT, |
| l_quantity DECIMAL(12,2), |
| l_extendedprice DECIMAL(12,2), |
| l_discount DECIMAL(12,2), |
| l_tax DECIMAL(12,2), |
| l_linestatus STRING, |
| l_shipdate STRING, |
| l_commitdate STRING, |
| l_receiptdate STRING, |
| l_shipinstruct STRING, |
| l_shipmode STRING, |
| l_comment STRING) |
| partitioned by (l_returnflag STRING) stored as parquet |
| """.format(table) |
| self.execute_query_expect_success(self.client, create_stmt) |
| |
| set_location_stmt = """alter table {0} set location '{1}'""".format( |
| table, table_location) |
| self.execute_query_expect_success(self.client, set_location_stmt) |
| |
| # Setting a lower parquet file size will result in multiple files being written. |
| self.execute_query_expect_success(self.client, "set parquet_file_size=10485760") |
| |
| insert_stmt = """insert into {0} partition(l_returnflag) /*+ clustered,shuffle */ |
| select l_orderkey, |
| l_partkey, |
| l_suppkey, |
| l_linenumber, |
| l_quantity, |
| l_extendedprice, |
| l_discount, |
| l_tax, |
| l_linestatus, |
| l_shipdate, |
| l_commitdate, |
| l_receiptdate, |
| l_shipinstruct, |
| l_shipmode, |
| l_comment, |
| l_returnflag |
| from tpch_parquet.lineitem""".format(table) |
| self.execute_query_expect_success(self.client, insert_stmt) |
| |
| expected_partition_files = [("l_returnflag=A", 3, 30), |
| ("l_returnflag=N", 3, 30), |
| ("l_returnflag=R", 3, 30)] |
| |
| for partition, min_files, max_files in expected_partition_files: |
| partition_path = "{0}/{1}".format(table_path, partition) |
| files = self.filesystem_client.ls(partition_path) |
| assert min_files <= len(files) and len(files) <= max_files, \ |
| "%s: %s" % (partition, files) |