| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import getpass |
| import itertools |
| import pytest |
| import re |
| import time |
| |
| from test_ddl_base import TestDdlBase |
| from tests.beeswax.impala_beeswax import ImpalaBeeswaxException |
| from tests.common.environ import (HIVE_MAJOR_VERSION) |
| from tests.common.impala_test_suite import LOG |
| from tests.common.parametrize import UniqueDatabase |
| from tests.common.skip import (SkipIf, SkipIfABFS, SkipIfADLS, SkipIfKudu, SkipIfLocal, |
| SkipIfCatalogV2, SkipIfHive2, SkipIfS3) |
| from tests.common.test_dimensions import create_single_exec_option_dimension |
| from tests.util.filesystem_utils import ( |
| WAREHOUSE, |
| IS_HDFS, |
| IS_S3, |
| IS_ADLS, |
| FILESYSTEM_NAME) |
| from tests.common.impala_cluster import ImpalaCluster |
| |
| # Validates DDL statements (create, drop) |
| class TestDdlStatements(TestDdlBase): |
| @SkipIfLocal.hdfs_client |
| def test_drop_table_with_purge(self, unique_database): |
| """This test checks if the table data is permanently deleted in |
| DROP TABLE <tbl> PURGE queries""" |
| self.client.execute("create table {0}.t1(i int)".format(unique_database)) |
| self.client.execute("create table {0}.t2(i int)".format(unique_database)) |
| # Create sample test data files under the table directories |
| self.filesystem_client.create_file("test-warehouse/{0}.db/t1/t1.txt".\ |
| format(unique_database), file_data='t1') |
| self.filesystem_client.create_file("test-warehouse/{0}.db/t2/t2.txt".\ |
| format(unique_database), file_data='t2') |
| # Drop the table (without purge) and make sure it exists in trash |
| self.client.execute("drop table {0}.t1".format(unique_database)) |
| assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/t1.txt".\ |
| format(unique_database)) |
| assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/".\ |
| format(unique_database)) |
| assert self.filesystem_client.exists(\ |
| "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/t1.txt".\ |
| format(getpass.getuser(), unique_database)) |
| assert self.filesystem_client.exists(\ |
| "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1".\ |
| format(getpass.getuser(), unique_database)) |
| # Drop the table (with purge) and make sure it doesn't exist in trash |
| self.client.execute("drop table {0}.t2 purge".format(unique_database)) |
| if not IS_S3 and not IS_ADLS: |
| # In S3, deletes are eventual. So even though we dropped the table, the files |
| # belonging to this table may still be visible for some unbounded time. This |
| # happens only with PURGE. A regular DROP TABLE is just a copy of files which is |
| # consistent. |
| # The ADLS Python client is not strongly consistent, so these files may still be |
| # visible after a DROP. (Remove after IMPALA-5335 is resolved) |
| assert not self.filesystem_client.exists("test-warehouse/{0}.db/t2/".\ |
| format(unique_database)) |
| assert not self.filesystem_client.exists("test-warehouse/{0}.db/t2/t2.txt".\ |
| format(unique_database)) |
| assert not self.filesystem_client.exists(\ |
| "user/{0}/.Trash/Current/test-warehouse/{1}.db/t2/t2.txt".\ |
| format(getpass.getuser(), unique_database)) |
| assert not self.filesystem_client.exists(\ |
| "user/{0}/.Trash/Current/test-warehouse/{1}.db/t2".\ |
| format(getpass.getuser(), unique_database)) |
| # Create an external table t3 and run the same test as above. Make |
| # sure the data is not deleted |
| self.filesystem_client.make_dir( |
| "test-warehouse/{0}.db/data_t3/".format(unique_database), permission=777) |
| self.filesystem_client.create_file( |
| "test-warehouse/{0}.db/data_t3/data.txt".format(unique_database), file_data='100') |
| self.client.execute("create external table {0}.t3(i int) stored as " |
| "textfile location \'/test-warehouse/{0}.db/data_t3\'" .format(unique_database)) |
| self.client.execute("drop table {0}.t3 purge".format(unique_database)) |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}.db/data_t3/data.txt".format(unique_database)) |
| self.filesystem_client.delete_file_dir( |
| "test-warehouse/{0}.db/data_t3".format(unique_database), recursive=True) |
| |
| @SkipIfADLS.eventually_consistent |
| @SkipIfLocal.hdfs_client |
| def test_drop_cleans_hdfs_dirs(self, unique_database): |
| self.client.execute('use default') |
| # Verify the db directory exists |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}.db/".format(unique_database)) |
| |
| self.client.execute("create table {0}.t1(i int)".format(unique_database)) |
| # Verify the table directory exists |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}.db/t1/".format(unique_database)) |
| |
| # Dropping the table removes the table's directory and preserves the db's directory |
| self.client.execute("drop table {0}.t1".format(unique_database)) |
| assert not self.filesystem_client.exists( |
| "test-warehouse/{0}.db/t1/".format(unique_database)) |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}.db/".format(unique_database)) |
| |
| # Dropping the db removes the db's directory |
| self.client.execute("drop database {0}".format(unique_database)) |
| assert not self.filesystem_client.exists( |
| "test-warehouse/{0}.db/".format(unique_database)) |
| |
| # Dropping the db using "cascade" removes all tables' and db's directories |
| # but keeps the external tables' directory |
| self._create_db(unique_database) |
| self.client.execute("create table {0}.t1(i int)".format(unique_database)) |
| self.client.execute("create table {0}.t2(i int)".format(unique_database)) |
| result = self.client.execute("create external table {0}.t3(i int) " |
| "location '{1}/{0}/t3/'".format(unique_database, WAREHOUSE)) |
| self.client.execute("drop database {0} cascade".format(unique_database)) |
| assert not self.filesystem_client.exists( |
| "test-warehouse/{0}.db/".format(unique_database)) |
| assert not self.filesystem_client.exists( |
| "test-warehouse/{0}.db/t1/".format(unique_database)) |
| assert not self.filesystem_client.exists( |
| "test-warehouse/{0}.db/t2/".format(unique_database)) |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}/t3/".format(unique_database)) |
| self.filesystem_client.delete_file_dir( |
| "test-warehouse/{0}/t3/".format(unique_database), recursive=True) |
| assert not self.filesystem_client.exists( |
| "test-warehouse/{0}/t3/".format(unique_database)) |
| # Re-create database to make unique_database teardown succeed. |
| self._create_db(unique_database) |
| |
| @SkipIfADLS.eventually_consistent |
| @SkipIfLocal.hdfs_client |
| def test_truncate_cleans_hdfs_files(self, unique_database): |
| # Verify the db directory exists |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}.db/".format(unique_database)) |
| |
| self.client.execute("create table {0}.t1(i int)".format(unique_database)) |
| # Verify the table directory exists |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}.db/t1/".format(unique_database)) |
| |
| try: |
| # If we're testing S3, we want the staging directory to be created. |
| self.client.execute("set s3_skip_insert_staging=false") |
| # Should have created one file in the table's dir |
| self.client.execute("insert into {0}.t1 values (1)".format(unique_database)) |
| assert len(self.filesystem_client.ls( |
| "test-warehouse/{0}.db/t1/".format(unique_database))) == 2 |
| |
| # Truncating the table removes the data files and preserves the table's directory |
| self.client.execute("truncate table {0}.t1".format(unique_database)) |
| assert len(self.filesystem_client.ls( |
| "test-warehouse/{0}.db/t1/".format(unique_database))) == 1 |
| |
| self.client.execute( |
| "create table {0}.t2(i int) partitioned by (p int)".format(unique_database)) |
| # Verify the table directory exists |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}.db/t2/".format(unique_database)) |
| |
| # Should have created the partition dir, which should contain exactly one file |
| self.client.execute( |
| "insert into {0}.t2 partition(p=1) values (1)".format(unique_database)) |
| assert len(self.filesystem_client.ls( |
| "test-warehouse/{0}.db/t2/p=1".format(unique_database))) == 1 |
| |
| # Truncating the table removes the data files and preserves the partition's directory |
| self.client.execute("truncate table {0}.t2".format(unique_database)) |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}.db/t2/p=1".format(unique_database)) |
| assert len(self.filesystem_client.ls( |
| "test-warehouse/{0}.db/t2/p=1".format(unique_database))) == 0 |
| finally: |
| # Reset to its default value. |
| self.client.execute("set s3_skip_insert_staging=true") |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_truncate_table(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/truncate-table', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_database(self, vector, unique_database): |
| # The unique_database provides the .test a unique database name which allows |
| # us to run this test in parallel with others. |
| self.run_test_case('QueryTest/create-database', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| def test_comment_on_database(self, vector, unique_database): |
| comment = self._get_db_comment(unique_database) |
| assert '' == comment |
| |
| self.client.execute("comment on database {0} is 'comment'".format(unique_database)) |
| comment = self._get_db_comment(unique_database) |
| assert 'comment' == comment |
| |
| self.client.execute("comment on database {0} is ''".format(unique_database)) |
| comment = self._get_db_comment(unique_database) |
| assert '' == comment |
| |
| self.client.execute("comment on database {0} is '\\'comment\\''".format(unique_database)) |
| comment = self._get_db_comment(unique_database) |
| assert "\\'comment\\'" == comment |
| |
| self.client.execute("comment on database {0} is null".format(unique_database)) |
| comment = self._get_db_comment(unique_database) |
| assert '' == comment |
| |
| def test_alter_database_set_owner(self, vector, unique_database): |
| self.client.execute("alter database {0} set owner user foo_user".format( |
| unique_database)) |
| properties = self._get_db_owner_properties(unique_database) |
| assert len(properties) == 1 |
| assert {'foo_user': 'USER'} == properties |
| |
| self.client.execute("alter database {0} set owner role foo_role".format( |
| unique_database)) |
| properties = self._get_db_owner_properties(unique_database) |
| assert len(properties) == 1 |
| assert {'foo_role': 'ROLE'} == properties |
| |
| def test_metadata_after_alter_database(self, vector, unique_database): |
| self.client.execute("create table {0}.tbl (i int)".format(unique_database)) |
| self.client.execute("create function {0}.f() returns int " |
| "location '{1}/libTestUdfs.so' symbol='NoArgs'" |
| .format(unique_database, WAREHOUSE)) |
| self.client.execute("alter database {0} set owner user foo_user".format( |
| unique_database)) |
| table_names = self.client.execute("show tables in {0}".format( |
| unique_database)).get_data() |
| assert "tbl" == table_names |
| func_names = self.client.execute("show functions in {0}".format( |
| unique_database)).get_data() |
| assert "INT\tf()\tNATIVE\ttrue" == func_names |
| |
| def test_alter_table_set_owner(self, vector, unique_database): |
| table_name = "{0}.test_owner_tbl".format(unique_database) |
| self.client.execute("create table {0}(i int)".format(table_name)) |
| self.client.execute("alter table {0} set owner user foo_user".format(table_name)) |
| owner = self._get_table_or_view_owner(table_name) |
| assert ('foo_user', 'USER') == owner |
| |
| self.client.execute("alter table {0} set owner role foo_role".format(table_name)) |
| owner = self._get_table_or_view_owner(table_name) |
| assert ('foo_role', 'ROLE') == owner |
| |
| def test_alter_view_set_owner(self, vector, unique_database): |
| view_name = "{0}.test_owner_tbl".format(unique_database) |
| self.client.execute("create view {0} as select 1".format(view_name)) |
| self.client.execute("alter view {0} set owner user foo_user".format(view_name)) |
| owner = self._get_table_or_view_owner(view_name) |
| assert ('foo_user', 'USER') == owner |
| |
| self.client.execute("alter view {0} set owner role foo_role".format(view_name)) |
| owner = self._get_table_or_view_owner(view_name) |
| assert ('foo_role', 'ROLE') == owner |
| |
| # There is a query in QueryTest/create-table that references nested types, which is not |
| # supported if old joins and aggs are enabled. Since we do not get any meaningful |
| # additional coverage by running a DDL test under the old aggs and joins, it can be |
| # skipped. |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/create-table', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table_like_table(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/create-table-like-table', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table_like_file(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/create-table-like-file', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @SkipIfHive2.orc |
| @SkipIfS3.hive |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table_like_file_orc(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/create-table-like-file-orc', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table_as_select(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/create-table-as-select', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| @SkipIfKudu.no_hybrid_clock |
| def test_create_kudu(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT" |
| self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| def test_comment_on_table(self, vector, unique_database): |
| table = '{0}.comment_table'.format(unique_database) |
| self.client.execute("create table {0} (i int)".format(table)) |
| |
| comment = self._get_table_or_view_comment(table) |
| assert comment is None |
| |
| self.client.execute("comment on table {0} is 'comment'".format(table)) |
| comment = self._get_table_or_view_comment(table) |
| assert "comment" == comment |
| |
| self.client.execute("comment on table {0} is ''".format(table)) |
| comment = self._get_table_or_view_comment(table) |
| assert "" == comment |
| |
| self.client.execute("comment on table {0} is '\\'comment\\''".format(table)) |
| comment = self._get_table_or_view_comment(table) |
| assert "\\\\'comment\\\\'" == comment |
| |
| self.client.execute("comment on table {0} is null".format(table)) |
| comment = self._get_table_or_view_comment(table) |
| assert comment is None |
| |
| def test_comment_on_view(self, vector, unique_database): |
| view = '{0}.comment_view'.format(unique_database) |
| self.client.execute("create view {0} as select 1".format(view)) |
| |
| comment = self._get_table_or_view_comment(view) |
| assert comment is None |
| |
| self.client.execute("comment on view {0} is 'comment'".format(view)) |
| comment = self._get_table_or_view_comment(view) |
| assert "comment" == comment |
| |
| self.client.execute("comment on view {0} is ''".format(view)) |
| comment = self._get_table_or_view_comment(view) |
| assert "" == comment |
| |
| self.client.execute("comment on view {0} is '\\'comment\\''".format(view)) |
| comment = self._get_table_or_view_comment(view) |
| assert "\\\\'comment\\\\'" == comment |
| |
| self.client.execute("comment on view {0} is null".format(view)) |
| comment = self._get_table_or_view_comment(view) |
| assert comment is None |
| |
| def test_comment_on_column(self, vector, unique_database): |
| table = "{0}.comment_table".format(unique_database) |
| self.client.execute("create table {0} (i int) partitioned by (j int)".format(table)) |
| |
| comment = self._get_column_comment(table, 'i') |
| assert '' == comment |
| |
| # Updating comment on a regular column. |
| self.client.execute("comment on column {0}.i is 'comment 1'".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "comment 1" == comment |
| |
| # Updating comment on a partition column. |
| self.client.execute("comment on column {0}.j is 'comment 2'".format(table)) |
| comment = self._get_column_comment(table, 'j') |
| assert "comment 2" == comment |
| |
| self.client.execute("comment on column {0}.i is ''".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is '\\'comment\\''".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "\\'comment\\'" == comment |
| |
| self.client.execute("comment on column {0}.i is null".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| view = "{0}.comment_view".format(unique_database) |
| self.client.execute("create view {0}(i) as select 1".format(view)) |
| |
| comment = self._get_column_comment(view, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is 'comment'".format(view)) |
| comment = self._get_column_comment(view, 'i') |
| assert "comment" == comment |
| |
| self.client.execute("comment on column {0}.i is ''".format(view)) |
| comment = self._get_column_comment(view, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is '\\'comment\\''".format(view)) |
| comment = self._get_column_comment(view, 'i') |
| assert "\\'comment\\'" == comment |
| |
| self.client.execute("comment on column {0}.i is null".format(view)) |
| comment = self._get_column_comment(view, 'i') |
| assert "" == comment |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_sync_ddl_drop(self, vector, unique_database): |
| """Verifies the catalog gets updated properly when dropping objects with sync_ddl |
| enabled""" |
| self.client.set_configuration({'sync_ddl': 1}) |
| # Drop the database immediately after creation (within a statestore heartbeat) and |
| # verify the catalog gets updated properly. |
| self.client.execute("drop database {0}".format(unique_database)) |
| assert unique_database not in self.all_db_names() |
| # Re-create database to make unique_database teardown succeed. |
| self._create_db(unique_database) |
| |
| # TODO: don't use hdfs_client |
| @SkipIfLocal.hdfs_client |
| @UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2) |
| def test_alter_table(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| |
| # Create an unpartitioned table to get a filesystem directory that does not |
| # use the (key=value) format. The directory is automatically cleanup up |
| # by the unique_database fixture. |
| self.client.execute("create table {0}.part_data (i int)".format(unique_database)) |
| assert self.filesystem_client.exists( |
| "test-warehouse/{0}.db/part_data".format(unique_database)) |
| self.filesystem_client.create_file( |
| "test-warehouse/{0}.db/part_data/data.txt".format(unique_database), |
| file_data='1984') |
| self.run_test_case('QueryTest/alter-table', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @SkipIf.not_hdfs |
| @SkipIfLocal.hdfs_client |
| @UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2) |
| def test_alter_table_hdfs_caching(self, vector, unique_database): |
| self.run_test_case('QueryTest/alter-table-hdfs-caching', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_alter_set_column_stats(self, vector, unique_database): |
| self.run_test_case('QueryTest/alter-table-set-column-stats', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @SkipIfLocal.hdfs_client |
| def test_drop_partition_with_purge(self, vector, unique_database): |
| """Verfies whether alter <tbl> drop partition purge actually skips trash""" |
| self.client.execute( |
| "create table {0}.t1(i int) partitioned by (j int)".format(unique_database)) |
| # Add two partitions (j=1) and (j=2) to table t1 |
| self.client.execute("alter table {0}.t1 add partition(j=1)".format(unique_database)) |
| self.client.execute("alter table {0}.t1 add partition(j=2)".format(unique_database)) |
| self.filesystem_client.create_file(\ |
| "test-warehouse/{0}.db/t1/j=1/j1.txt".format(unique_database), file_data='j1') |
| self.filesystem_client.create_file(\ |
| "test-warehouse/{0}.db/t1/j=2/j2.txt".format(unique_database), file_data='j2') |
| # Drop the partition (j=1) without purge and make sure it exists in trash |
| self.client.execute("alter table {0}.t1 drop partition(j=1)".format(unique_database)) |
| assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=1/j1.txt".\ |
| format(unique_database)) |
| assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=1".\ |
| format(unique_database)) |
| assert self.filesystem_client.exists(\ |
| "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1/j1.txt".\ |
| format(getpass.getuser(), unique_database)) |
| assert self.filesystem_client.exists(\ |
| "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=1".\ |
| format(getpass.getuser(), unique_database)) |
| # Drop the partition (with purge) and make sure it doesn't exist in trash |
| self.client.execute("alter table {0}.t1 drop partition(j=2) purge".\ |
| format(unique_database)); |
| if not IS_S3 and not IS_ADLS: |
| # In S3, deletes are eventual. So even though we dropped the partition, the files |
| # belonging to this partition may still be visible for some unbounded time. This |
| # happens only with PURGE. A regular DROP TABLE is just a copy of files which is |
| # consistent. |
| # The ADLS Python client is not strongly consistent, so these files may still be |
| # visible after a DROP. (Remove after IMPALA-5335 is resolved) |
| assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=2/j2.txt".\ |
| format(unique_database)) |
| assert not self.filesystem_client.exists("test-warehouse/{0}.db/t1/j=2".\ |
| format(unique_database)) |
| assert not self.filesystem_client.exists(\ |
| "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2".\ |
| format(getpass.getuser(), unique_database)) |
| assert not self.filesystem_client.exists( |
| "user/{0}/.Trash/Current/test-warehouse/{1}.db/t1/j=2/j2.txt".\ |
| format(getpass.getuser(), unique_database)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_views_ddl(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/views-ddl', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize() |
| def test_view_hints(self, vector, unique_database): |
| # Test that plan hints are stored in the view's comment field; this should work |
| # regardless of how Hive formats the output. Getting this to work with the |
| # automated test case runner is rather difficult, so verify directly. There |
| # should be two # of each join hint, one for the original text, one for the expanded |
| self.client.execute(""" |
| create view {0}.hints_test as |
| select /* +straight_join */ a.* from functional.alltypestiny a |
| inner join /* +broadcast */ functional.alltypes b on a.id = b.id |
| inner join /* +shuffle */ functional.alltypessmall c on b.id = c.id |
| """.format(unique_database)) |
| results = self.execute_query("describe formatted %s.hints_test" % unique_database) |
| sj, bc, shuf = 0,0,0 |
| for row in results.data: |
| sj += '-- +straight_join' in row |
| bc += '-- +broadcast' in row |
| shuf += '-- +shuffle' in row |
| assert sj == 2 |
| assert bc == 2 |
| assert shuf == 2 |
| |
| # Test querying the hinted view. |
| results = self.execute_query("select count(*) from %s.hints_test" % unique_database) |
| assert results.success |
| assert len(results.data) == 1 |
| assert results.data[0] == '8' |
| |
| # Test the plan to make sure hints were applied correctly |
| plan = self.execute_query("explain select * from %s.hints_test" % unique_database, |
| query_options={'explain_level':0}) |
| plan_match = """PLAN-ROOT SINK |
| 08:EXCHANGE [UNPARTITIONED] |
| 04:HASH JOIN [INNER JOIN, PARTITIONED] |
| |--07:EXCHANGE [HASH(c.id)] |
| | 02:SCAN {filesystem_name} [functional.alltypessmall c] |
| 06:EXCHANGE [HASH(b.id)] |
| 03:HASH JOIN [INNER JOIN, BROADCAST] |
| |--05:EXCHANGE [BROADCAST] |
| | 01:SCAN {filesystem_name} [functional.alltypes b] |
| 00:SCAN {filesystem_name} [functional.alltypestiny a]""" |
| assert plan_match.format(filesystem_name=FILESYSTEM_NAME) in '\n'.join(plan.data) |
| |
| def _verify_describe_view(self, vector, view_name, expected_substr): |
| """ |
| Verify across all impalads that the view 'view_name' has the given substring in its |
| expanded SQL. |
| |
| If SYNC_DDL is enabled, the verification should complete immediately. Otherwise, |
| loops waiting for the expected condition to pass. |
| """ |
| if vector.get_value('exec_option')['sync_ddl']: |
| num_attempts = 1 |
| else: |
| num_attempts = 60 |
| for impalad in ImpalaCluster.get_e2e_test_cluster().impalads: |
| client = impalad.service.create_beeswax_client() |
| try: |
| for attempt in itertools.count(1): |
| assert attempt <= num_attempts, "ran out of attempts" |
| try: |
| result = self.execute_query_expect_success( |
| client, "describe formatted %s" % view_name) |
| exp_line = [l for l in result.data if 'View Expanded' in l][0] |
| except ImpalaBeeswaxException, e: |
| # In non-SYNC_DDL tests, it's OK to get a "missing view" type error |
| # until the metadata propagates. |
| exp_line = "Exception: %s" % e |
| if expected_substr in exp_line.lower(): |
| return |
| time.sleep(1) |
| finally: |
| client.close() |
| |
| |
| def test_views_describe(self, vector, unique_database): |
| # IMPALA-6896: Tests that altered views can be described by all impalads. |
| impala_cluster = ImpalaCluster.get_e2e_test_cluster() |
| impalads = impala_cluster.impalads |
| view_name = "%s.test_describe_view" % unique_database |
| query_opts = vector.get_value('exec_option') |
| first_client = impalads[0].service.create_beeswax_client() |
| try: |
| # Create a view and verify it's visible. |
| self.execute_query_expect_success(first_client, |
| "create view {0} as " |
| "select * from functional.alltypes" |
| .format(view_name), query_opts) |
| self._verify_describe_view(vector, view_name, "select * from functional.alltypes") |
| |
| # Alter the view and verify the alter is visible. |
| self.execute_query_expect_success(first_client, |
| "alter view {0} as " |
| "select * from functional.alltypesagg" |
| .format(view_name), query_opts) |
| self._verify_describe_view(vector, view_name, |
| "select * from functional.alltypesagg") |
| finally: |
| first_client.close() |
| |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_functions_ddl(self, vector, unique_database): |
| self.run_test_case('QueryTest/functions-ddl', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @SkipIfLocal.hdfs_client |
| def test_create_alter_bulk_partition(self, vector, unique_database): |
| # Change the scale depending on the exploration strategy, with 50 partitions this |
| # test runs a few minutes, with 10 partitions it takes ~50s for two configurations. |
| num_parts = 50 if self.exploration_strategy() == 'exhaustive' else 10 |
| fq_tbl_name = unique_database + ".part_test_tbl" |
| self.client.execute("create table {0}(i int) partitioned by(j int, s string) " |
| "location '{1}/{0}'".format(fq_tbl_name, WAREHOUSE)) |
| |
| # Add some partitions (first batch of two) |
| for i in xrange(num_parts / 5): |
| start = time.time() |
| self.client.execute( |
| "alter table {0} add partition(j={1}, s='{1}')".format(fq_tbl_name, i)) |
| LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start)) |
| |
| # Modify one of the partitions |
| self.client.execute("alter table {0} partition(j=1, s='1')" |
| " set fileformat parquetfile".format(fq_tbl_name)) |
| |
| # Alter one partition to a non-existent location twice (IMPALA-741) |
| self.filesystem_client.delete_file_dir("tmp/dont_exist1/", recursive=True) |
| self.filesystem_client.delete_file_dir("tmp/dont_exist2/", recursive=True) |
| |
| self.execute_query_expect_success(self.client, |
| "alter table {0} partition(j=1,s='1') set location '{1}/tmp/dont_exist1'" |
| .format(fq_tbl_name, WAREHOUSE)) |
| self.execute_query_expect_success(self.client, |
| "alter table {0} partition(j=1,s='1') set location '{1}/tmp/dont_exist2'" |
| .format(fq_tbl_name, WAREHOUSE)) |
| |
| # Add some more partitions |
| for i in xrange(num_parts / 5, num_parts): |
| start = time.time() |
| self.client.execute( |
| "alter table {0} add partition(j={1},s='{1}')".format(fq_tbl_name, i)) |
| LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start)) |
| |
| # Insert data and verify it shows up. |
| self.client.execute( |
| "insert into table {0} partition(j=1, s='1') select 1".format(fq_tbl_name)) |
| assert '1' == self.execute_scalar("select count(*) from {0}".format(fq_tbl_name)) |
| |
| def test_alter_table_create_many_partitions(self, vector, unique_database): |
| """ |
| Checks that creating more partitions than the MAX_PARTITION_UPDATES_PER_RPC |
| batch size works, in that it creates all the underlying partitions. |
| """ |
| self.client.execute( |
| "create table {0}.t(i int) partitioned by (p int)".format(unique_database)) |
| MAX_PARTITION_UPDATES_PER_RPC = 500 |
| alter_stmt = "alter table {0}.t add ".format(unique_database) + " ".join( |
| "partition(p=%d)" % (i,) for i in xrange(MAX_PARTITION_UPDATES_PER_RPC + 2)) |
| self.client.execute(alter_stmt) |
| partitions = self.client.execute("show partitions {0}.t".format(unique_database)) |
| # Show partitions will contain partition HDFS paths, which we expect to contain |
| # "p=val" subdirectories for each partition. The regexp finds all the "p=[0-9]*" |
| # paths, converts them to integers, and checks that wehave all the ones we |
| # expect. |
| PARTITION_RE = re.compile("p=([0-9]+)") |
| assert map(int, PARTITION_RE.findall(str(partitions))) == \ |
| range(MAX_PARTITION_UPDATES_PER_RPC + 2) |
| |
| def test_create_alter_tbl_properties(self, vector, unique_database): |
| fq_tbl_name = unique_database + ".test_alter_tbl" |
| |
| # Specify TBLPROPERTIES and SERDEPROPERTIES at CREATE time |
| self.client.execute("""create table {0} (i int) |
| with serdeproperties ('s1'='s2', 's3'='s4') |
| tblproperties ('p1'='v0', 'p1'='v1')""".format(fq_tbl_name)) |
| properties = self._get_tbl_properties(fq_tbl_name) |
| |
| if HIVE_MAJOR_VERSION > 2: |
| assert properties['OBJCAPABILITIES'] == 'EXTREAD,EXTWRITE' |
| assert properties['TRANSLATED_TO_EXTERNAL'] == 'TRUE' |
| assert properties['external.table.purge'] == 'TRUE' |
| assert properties['EXTERNAL'] == 'TRUE' |
| del properties['OBJCAPABILITIES'] |
| del properties['TRANSLATED_TO_EXTERNAL'] |
| del properties['external.table.purge'] |
| del properties['EXTERNAL'] |
| assert len(properties) == 2 |
| # The transient_lastDdlTime is variable, so don't verify the value. |
| assert 'transient_lastDdlTime' in properties |
| del properties['transient_lastDdlTime'] |
| assert {'p1': 'v1'} == properties |
| |
| properties = self._get_serde_properties(fq_tbl_name) |
| assert {'s1': 's2', 's3': 's4'} == properties |
| |
| # Modify the SERDEPROPERTIES using ALTER TABLE SET. |
| self.client.execute("alter table {0} set serdeproperties " |
| "('s1'='new', 's5'='s6')".format(fq_tbl_name)) |
| properties = self._get_serde_properties(fq_tbl_name) |
| assert {'s1': 'new', 's3': 's4', 's5': 's6'} == properties |
| |
| # Modify the TBLPROPERTIES using ALTER TABLE SET. |
| self.client.execute("alter table {0} set tblproperties " |
| "('prop1'='val1', 'p2'='val2', 'p2'='val3', ''='')".format(fq_tbl_name)) |
| properties = self._get_tbl_properties(fq_tbl_name) |
| |
| if HIVE_MAJOR_VERSION > 2: |
| assert 'OBJCAPABILITIES' in properties |
| assert 'transient_lastDdlTime' in properties |
| assert properties['p1'] == 'v1' |
| assert properties['prop1'] == 'val1' |
| assert properties['p2'] == 'val3' |
| assert properties[''] == '' |
| |
| @SkipIfHive2.acid |
| def test_create_insertonly_tbl(self, vector, unique_database): |
| insertonly_tbl = unique_database + ".test_insertonly" |
| self.client.execute("""create table {0} (coli int) stored as parquet tblproperties( |
| 'transactional'='true', 'transactional_properties'='insert_only')""" |
| .format(insertonly_tbl)) |
| properties = self._get_tbl_properties(insertonly_tbl) |
| assert properties['OBJCAPABILITIES'] == 'HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE' |
| |
| def test_alter_tbl_properties_reload(self, vector, unique_database): |
| # IMPALA-8734: Force a table schema reload when setting table properties. |
| tbl_name = "test_tbl" |
| self.execute_query_expect_success(self.client, "create table {0}.{1} (c1 string)" |
| .format(unique_database, tbl_name)) |
| self.filesystem_client.create_file("test-warehouse/{0}.db/{1}/f". |
| format(unique_database, tbl_name), |
| file_data="\nfoo\n") |
| self.execute_query_expect_success(self.client, |
| "alter table {0}.{1} set tblproperties" |
| "('serialization.null.format'='foo')" |
| .format(unique_database, tbl_name)) |
| result = self.execute_query_expect_success(self.client, |
| "select * from {0}.{1}" |
| .format(unique_database, tbl_name)) |
| assert len(result.data) == 2 |
| assert result.data[0] == '' |
| assert result.data[1] == 'NULL' |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_partition_ddl_predicates(self, vector, unique_database): |
| self.run_test_case('QueryTest/partition-ddl-predicates-all-fs', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| if IS_HDFS: |
| self.run_test_case('QueryTest/partition-ddl-predicates-hdfs-only', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| def test_create_table_file_format(self, vector, unique_database): |
| # When default_file_format query option is not specified, the default table file |
| # format is TEXT. |
| text_table = "{0}.text_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int)".format(text_table)) |
| result = self.execute_query_expect_success( |
| self.client, "show create table {0}".format(text_table)) |
| assert any("TEXTFILE" in x for x in result.data) |
| |
| self.execute_query_expect_failure( |
| self.client, "create table {0}.foobar_tbl".format(unique_database), |
| {"default_file_format": "foobar"}) |
| |
| parquet_table = "{0}.parquet_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int)".format(parquet_table), |
| {"default_file_format": "parquet"}) |
| result = self.execute_query_expect_success( |
| self.client, "show create table {0}".format(parquet_table)) |
| assert any("PARQUET" in x for x in result.data) |
| |
| # The table created should still be ORC even though the default_file_format query |
| # option is set to parquet. |
| orc_table = "{0}.orc_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, |
| "create table {0}(i int) stored as orc".format(orc_table), |
| {"default_file_format": "parquet"}) |
| result = self.execute_query_expect_success( |
| self.client, "show create table {0}".format(orc_table)) |
| assert any("ORC" in x for x in result.data) |
| |
| @SkipIfHive2.acid |
| def test_create_table_transactional_type(self, vector, unique_database): |
| # When default_transactional_type query option is not specified, the transaction |
| # related table properties are not set. |
| non_acid_table = "{0}.non_acid_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int)".format(non_acid_table), |
| {"default_transactional_type": "none"}) |
| props = self._get_properties("Table Parameters", non_acid_table) |
| assert "transactional" not in props |
| assert "transactional_properties" not in props |
| |
| # Create table as "insert_only" transactional. |
| insert_only_acid_table = "{0}.insert_only_acid_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int)".format(insert_only_acid_table), |
| {"default_transactional_type": "insert_only"}) |
| props = self._get_properties("Table Parameters", insert_only_acid_table) |
| assert props["transactional"] == "true" |
| assert props["transactional_properties"] == "insert_only" |
| |
| # default_transactional_type query option should not affect external tables |
| external_table = "{0}.external_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create external table {0}(i int)".format(external_table), |
| {"default_transactional_type": "insert_only"}) |
| props = self._get_properties("Table Parameters", external_table) |
| assert "transactional" not in props |
| assert "transactional_properties" not in props |
| |
| # default_transactional_type query option should not affect Kudu tables. |
| kudu_table = "{0}.kudu_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, |
| "create table {0}(i int primary key) stored as kudu".format(kudu_table), |
| {"default_transactional_type": "insert_only"}) |
| props = self._get_properties("Table Parameters", kudu_table) |
| assert "transactional" not in props |
| assert "transactional_properties" not in props |
| |
| # default_transactional_type query option should have no effect when transactional |
| # table properties are set manually. |
| manual_acid_table = "{0}.manual_acid_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int) TBLPROPERTIES ('transactional'='false')" |
| .format(manual_acid_table), |
| {"default_transactional_type": "insert_only"}) |
| props = self._get_properties("Table Parameters", manual_acid_table) |
| assert "transactional" not in props |
| assert "transactional_properties" not in props |
| |
| def test_kudu_column_comment(self, vector, unique_database): |
| table = "{0}.kudu_table0".format(unique_database) |
| self.client.execute("create table {0}(x int comment 'x' primary key) \ |
| stored as kudu".format(table)) |
| comment = self._get_column_comment(table, 'x') |
| assert "x" == comment |
| |
| table = "{0}.kudu_table".format(unique_database) |
| self.client.execute("create table {0}(i int primary key) stored as kudu" |
| .format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is 'comment1'".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "comment1" == comment |
| |
| self.client.execute("comment on column {0}.i is ''".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is 'comment2'".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "comment2" == comment |
| |
| self.client.execute("comment on column {0}.i is null".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("alter table {0} alter column i set comment 'comment3'" |
| .format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "comment3" == comment |
| |
| self.client.execute("alter table {0} alter column i set comment ''".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("alter table {0} add columns (j int comment 'comment4')" |
| .format(table)) |
| comment = self._get_column_comment(table, 'j') |
| assert "comment4" == comment |
| |
| # IMPALA-2002: Tests repeated adding/dropping of .jar and .so in the lib cache. |
| class TestLibCache(TestDdlBase): |
| @classmethod |
| def get_workload(self): |
| return 'functional-query' |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestLibCache, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) |
| |
| def test_create_drop_function(self, vector, unique_database): |
| """This will create, run, and drop the same function repeatedly, exercising the |
| lib cache mechanism. |
| """ |
| create_fn_stmt = ("create function {0}.f() returns int " |
| "location '{1}/libTestUdfs.so' symbol='NoArgs'" |
| .format(unique_database, WAREHOUSE)) |
| select_stmt = ("select {0}.f() from functional.alltypes limit 10" |
| .format(unique_database)) |
| drop_fn_stmt = "drop function %s {0}.f()".format(unique_database) |
| self.create_drop_ddl(vector, [create_fn_stmt], [drop_fn_stmt], select_stmt) |
| |
| # Run serially because this test inspects global impalad metrics. |
| # TODO: The metrics checks could be relaxed to enable running this test in |
| # parallel, but that might need a more general wait_for_metric_value(). |
| @SkipIfCatalogV2.data_sources_unsupported() |
| @pytest.mark.execute_serially |
| def test_create_drop_data_src(self, vector, unique_database): |
| """This will create, run, and drop the same data source repeatedly, exercising |
| the lib cache mechanism. |
| """ |
| data_src_name = unique_database + "_datasrc" |
| create_ds_stmt = ("CREATE DATA SOURCE {0} " |
| "LOCATION '{1}/data-sources/test-data-source.jar' " |
| "CLASS 'org.apache.impala.extdatasource.AllTypesDataSource' " |
| "API_VERSION 'V1'".format(data_src_name, WAREHOUSE)) |
| create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) " |
| "PRODUCED BY DATA SOURCE {1}('dummy_init_string')")\ |
| .format(unique_database, data_src_name) |
| drop_ds_stmt = "drop data source %s {0}".format(data_src_name) |
| drop_tbl_stmt = "drop table %s {0}.data_src_tbl".format(unique_database) |
| select_stmt = "select * from {0}.data_src_tbl limit 1".format(unique_database) |
| class_cache_hits_metric = "external-data-source.class-cache.hits" |
| class_cache_misses_metric = "external-data-source.class-cache.misses" |
| |
| create_stmts = [create_ds_stmt, create_tbl_stmt] |
| drop_stmts = [drop_tbl_stmt, drop_ds_stmt] |
| |
| # The ImpaladService is used to capture metrics |
| service = self.impalad_test_service |
| |
| # Initial metric values |
| class_cache_hits = service.get_metric_value(class_cache_hits_metric) |
| class_cache_misses = service.get_metric_value(class_cache_misses_metric) |
| # Test with 1 node so we can check the metrics on only the coordinator |
| vector.get_value('exec_option')['num_nodes'] = 1 |
| num_iterations = 2 |
| self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, num_iterations) |
| |
| # Check class cache metrics. Shouldn't have any new cache hits, there should be |
| # 2 cache misses for every iteration (jar is loaded by both the FE and BE). |
| expected_cache_misses = class_cache_misses + (num_iterations * 2) |
| service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits) |
| service.wait_for_metric_value(class_cache_misses_metric, |
| expected_cache_misses) |
| |
| # Test with a table that caches the class |
| create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) " |
| "PRODUCED BY DATA SOURCE {1}('CACHE_CLASS::dummy_init_string')")\ |
| .format(unique_database, data_src_name) |
| create_stmts = [create_ds_stmt, create_tbl_stmt] |
| # Run once before capturing metrics because the class already may be cached from |
| # a previous test run. |
| # TODO: Provide a way to clear the cache |
| self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1) |
| |
| # Capture metric values and run again, should hit the cache. |
| class_cache_hits = service.get_metric_value(class_cache_hits_metric) |
| class_cache_misses = service.get_metric_value(class_cache_misses_metric) |
| self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1) |
| service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits + 2) |
| service.wait_for_metric_value(class_cache_misses_metric, class_cache_misses) |
| |
| def create_drop_ddl(self, vector, create_stmts, drop_stmts, select_stmt, |
| num_iterations=3): |
| """Helper method to run CREATE/DROP DDL commands repeatedly and exercise the lib |
| cache. create_stmts is the list of CREATE statements to be executed in order |
| drop_stmts is the list of DROP statements to be executed in order. Each statement |
| should have a '%s' placeholder to insert "IF EXISTS" or "". The select_stmt is just a |
| single statement to test after executing the CREATE statements. |
| TODO: it's hard to tell that the cache is working (i.e. if it did nothing to drop |
| the cache, these tests would still pass). Testing that is a bit harder and requires |
| us to update the udf binary in the middle. |
| """ |
| self.client.set_configuration(vector.get_value("exec_option")) |
| for drop_stmt in drop_stmts: self.client.execute(drop_stmt % ("if exists")) |
| for i in xrange(0, num_iterations): |
| for create_stmt in create_stmts: self.client.execute(create_stmt) |
| self.client.execute(select_stmt) |
| for drop_stmt in drop_stmts: self.client.execute(drop_stmt % ("")) |