| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import getpass |
| import itertools |
| import pytest |
| import re |
| import time |
| import threading |
| from multiprocessing.pool import ThreadPool |
| from multiprocessing import TimeoutError |
| |
| from copy import deepcopy |
| from tests.metadata.test_ddl_base import TestDdlBase |
| from tests.common.environ import (HIVE_MAJOR_VERSION) |
| from tests.common.file_utils import create_table_from_orc |
| from tests.common.impala_connection import ( |
| FINISHED, INITIALIZED, IMPALA_CONNECTION_EXCEPTION, PENDING, RUNNING) |
| from tests.common.impala_test_suite import LOG |
| from tests.common.parametrize import UniqueDatabase |
| from tests.common.skip import ( |
| SkipIfCalcite, |
| SkipIfDockerizedCluster, |
| SkipIfFS, |
| SkipIfHive2, |
| SkipIfKudu, |
| SkipIfLocal) |
| from tests.common.test_dimensions import create_single_exec_option_dimension |
| from tests.common.test_dimensions import (create_exec_option_dimension, |
| create_client_protocol_dimension, create_exec_option_dimension_from_dict) |
| from tests.common.test_vector import ImpalaTestDimension |
| from tests.util.filesystem_utils import ( |
| get_fs_path, |
| WAREHOUSE, |
| IS_HDFS, |
| IS_S3, |
| IS_ADLS, |
| IS_OZONE, |
| FILESYSTEM_NAME) |
| from tests.common.impala_cluster import ImpalaCluster |
| from tests.util.filesystem_utils import FILESYSTEM_PREFIX |
| from tests.util.parse_util import parse_duration_string_ms |
| from tests.util.shell_util import dump_server_stacktraces |
| |
| |
| def get_trash_path(bucket, path): |
| if IS_OZONE: |
| return get_fs_path('/{0}/.Trash/{1}/Current/{2}'.format( |
| bucket, getpass.getuser(), path)) |
| return '/user/{0}/.Trash/Current/{1}/{2}'.format(getpass.getuser(), bucket, path) |
| |
| |
| # Validates DDL statements (create, drop) |
| class TestDdlStatements(TestDdlBase): |
| @SkipIfLocal.hdfs_client |
| def test_drop_table_with_purge(self, unique_database): |
| """This test checks if the table data is permanently deleted in |
| DROP TABLE <tbl> PURGE queries""" |
| self.client.execute("create table {0}.t1(i int)".format(unique_database)) |
| self.client.execute("create table {0}.t2(i int)".format(unique_database)) |
| # Create sample test data files under the table directories |
| dbpath = "{0}/{1}.db".format(WAREHOUSE, unique_database) |
| self.filesystem_client.create_file("{}/t1/t1.txt".format(dbpath), file_data='t1') |
| self.filesystem_client.create_file("{}/t2/t2.txt".format(dbpath), file_data='t2') |
| # Drop the table (without purge) and make sure it exists in trash |
| self.client.execute("drop table {0}.t1".format(unique_database)) |
| assert not self.filesystem_client.exists("{}/t1/t1.txt".format(dbpath)) |
| assert not self.filesystem_client.exists("{}/t1/".format(dbpath)) |
| trash = get_trash_path("test-warehouse", unique_database + ".db") |
| assert self.filesystem_client.exists("{}/t1/t1.txt".format(trash)) |
| assert self.filesystem_client.exists("{}/t1".format(trash)) |
| # Drop the table (with purge) and make sure it doesn't exist in trash |
| self.client.execute("drop table {0}.t2 purge".format(unique_database)) |
| if not IS_S3 and not IS_ADLS: |
| # In S3, deletes are eventual. So even though we dropped the table, the files |
| # belonging to this table may still be visible for some unbounded time. This |
| # happens only with PURGE. A regular DROP TABLE is just a copy of files which is |
| # consistent. |
| # The ADLS Python client is not strongly consistent, so these files may still be |
| # visible after a DROP. (Remove after IMPALA-5335 is resolved) |
| assert not self.filesystem_client.exists("{}/t2/".format(dbpath)) |
| assert not self.filesystem_client.exists("{}/t2/t2.txt".format(dbpath)) |
| assert not self.filesystem_client.exists("{}/t2/t2.txt".format(trash)) |
| assert not self.filesystem_client.exists("{}/t2".format(trash)) |
| # Create an external table t3 and run the same test as above. Make |
| # sure the data is not deleted |
| self.filesystem_client.make_dir("{}/data_t3/".format(dbpath), permission=777) |
| self.filesystem_client.create_file( |
| "{}/data_t3/data.txt".format(dbpath), file_data='100') |
| self.client.execute("create external table {0}.t3(i int) stored as " |
| "textfile location \'{1}/data_t3\'".format(unique_database, dbpath)) |
| self.client.execute("drop table {0}.t3 purge".format(unique_database)) |
| assert self.filesystem_client.exists("{}/data_t3/data.txt".format(dbpath)) |
| self.filesystem_client.delete_file_dir("{}/data_t3".format(dbpath), recursive=True) |
| |
| @SkipIfFS.eventually_consistent |
| @SkipIfLocal.hdfs_client |
| def test_drop_cleans_hdfs_dirs(self, unique_database): |
| self.client.execute('use default') |
| # Verify the db directory exists |
| assert self.filesystem_client.exists( |
| "{1}/{0}.db/".format(unique_database, WAREHOUSE)) |
| |
| self.client.execute("create table {0}.t1(i int)".format(unique_database)) |
| # Verify the table directory exists |
| assert self.filesystem_client.exists( |
| "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE)) |
| |
| # Dropping the table removes the table's directory and preserves the db's directory |
| self.client.execute("drop table {0}.t1".format(unique_database)) |
| assert not self.filesystem_client.exists( |
| "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE)) |
| assert self.filesystem_client.exists( |
| "{1}/{0}.db/".format(unique_database, WAREHOUSE)) |
| |
| # Dropping the db removes the db's directory |
| self.client.execute("drop database {0}".format(unique_database)) |
| assert not self.filesystem_client.exists( |
| "{1}/{0}.db/".format(unique_database, WAREHOUSE)) |
| |
| # Dropping the db using "cascade" removes all tables' and db's directories |
| # but keeps the external tables' directory |
| self._create_db(unique_database) |
| self.client.execute("create table {0}.t1(i int)".format(unique_database)) |
| self.client.execute("create table {0}.t2(i int)".format(unique_database)) |
| self.client.execute("create external table {0}.t3(i int) " |
| "location '{1}/{0}/t3/'".format(unique_database, WAREHOUSE)) |
| self.client.execute("drop database {0} cascade".format(unique_database)) |
| assert not self.filesystem_client.exists( |
| "{1}/{0}.db/".format(unique_database, WAREHOUSE)) |
| assert not self.filesystem_client.exists( |
| "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE)) |
| assert not self.filesystem_client.exists( |
| "{1}/{0}.db/t2/".format(unique_database, WAREHOUSE)) |
| assert self.filesystem_client.exists( |
| "{1}/{0}/t3/".format(unique_database, WAREHOUSE)) |
| self.filesystem_client.delete_file_dir( |
| "{1}/{0}/t3/".format(unique_database, WAREHOUSE), recursive=True) |
| assert not self.filesystem_client.exists( |
| "{1}/{0}/t3/".format(unique_database, WAREHOUSE)) |
| # Re-create database to make unique_database teardown succeed. |
| self._create_db(unique_database) |
| |
| @SkipIfFS.eventually_consistent |
| @SkipIfLocal.hdfs_client |
| def test_truncate_cleans_hdfs_files(self, unique_database): |
| # Verify the db directory exists |
| assert self.filesystem_client.exists( |
| "{1}/{0}.db/".format(unique_database, WAREHOUSE)) |
| |
| self.client.execute("create table {0}.t1(i int)".format(unique_database)) |
| # Verify the table directory exists |
| assert self.filesystem_client.exists( |
| "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE)) |
| |
| try: |
| # If we're testing S3, we want the staging directory to be created. |
| self.client.execute("set s3_skip_insert_staging=false") |
| # Should have created one file in the table's dir |
| self.client.execute("insert into {0}.t1 values (1)".format(unique_database)) |
| assert len(self.filesystem_client.ls( |
| "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))) == 2 |
| |
| # Truncating the table removes the data files and the staging directory |
| self.client.execute("truncate table {0}.t1".format(unique_database)) |
| assert len(self.filesystem_client.ls( |
| "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))) == 0 |
| |
| self.client.execute( |
| "create table {0}.t2(i int) partitioned by (p int)".format(unique_database)) |
| # Verify the table directory exists |
| assert self.filesystem_client.exists( |
| "{1}/{0}.db/t2/".format(unique_database, WAREHOUSE)) |
| |
| # Should have created the partition dir, which should contain exactly one file |
| self.client.execute( |
| "insert into {0}.t2 partition(p=1) values (1)".format(unique_database)) |
| assert len(self.filesystem_client.ls( |
| "{1}/{0}.db/t2/p=1".format(unique_database, WAREHOUSE))) == 1 |
| |
| # Truncating the table removes the data files and preserves the partition's |
| # directory |
| self.client.execute("truncate table {0}.t2".format(unique_database)) |
| assert self.filesystem_client.exists( |
| "{1}/{0}.db/t2/p=1".format(unique_database, WAREHOUSE)) |
| assert len(self.filesystem_client.ls( |
| "{1}/{0}.db/t2/p=1".format(unique_database, WAREHOUSE))) == 0 |
| finally: |
| # Reset to its default value. |
| self.client.execute("set s3_skip_insert_staging=true") |
| |
| @SkipIfFS.incorrent_reported_ec |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_truncate_table(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/truncate-table', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_database(self, vector, unique_database): |
| # The unique_database provides the .test a unique database name which allows |
| # us to run this test in parallel with others. |
| self.run_test_case('QueryTest/create-database', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| def test_comment_on_database(self, unique_database): |
| comment = self._get_db_comment(unique_database) |
| assert '' == comment |
| |
| self.client.execute("comment on database {0} is 'comment'".format(unique_database)) |
| comment = self._get_db_comment(unique_database) |
| assert 'comment' == comment |
| |
| self.client.execute("comment on database {0} is ''".format(unique_database)) |
| comment = self._get_db_comment(unique_database) |
| assert '' == comment |
| |
| self.client.execute("comment on database {0} is '\\'comment\\''".format( |
| unique_database)) |
| comment = self._get_db_comment(unique_database) |
| assert "\\'comment\\'" == comment |
| |
| self.client.execute("comment on database {0} is null".format(unique_database)) |
| comment = self._get_db_comment(unique_database) |
| assert '' == comment |
| |
| long_comment = 'a' * 4000 |
| self.client.execute("comment on database {0} is '{1}'".format( |
| unique_database, long_comment)) |
| comment = self._get_db_comment(unique_database) |
| assert long_comment == comment |
| |
| def test_alter_database_set_owner(self, unique_database): |
| self.client.execute("alter database {0} set owner user foo_user".format( |
| unique_database)) |
| properties = self._get_db_owner_properties(unique_database) |
| assert len(properties) == 1 |
| assert {'foo_user': 'USER'} == properties |
| |
| self.client.execute("alter database {0} set owner role foo_role".format( |
| unique_database)) |
| properties = self._get_db_owner_properties(unique_database) |
| assert len(properties) == 1 |
| assert {'foo_role': 'ROLE'} == properties |
| |
| def test_metadata_after_alter_database(self, unique_database): |
| self.client.execute("create table {0}.tbl (i int)".format(unique_database)) |
| self.client.execute("create function {0}.f() returns int " |
| "location '{1}/libTestUdfs.so' symbol='NoArgs'" |
| .format(unique_database, WAREHOUSE)) |
| self.client.execute("alter database {0} set owner user foo_user".format( |
| unique_database)) |
| table_names = self.client.execute("show tables in {0}".format( |
| unique_database)).get_data() |
| assert "tbl" == table_names |
| func_names = self.client.execute("show functions in {0}".format( |
| unique_database)).get_data() |
| assert "INT\tf()\tNATIVE\ttrue" == func_names |
| |
| def test_alter_database_set_db_properties(self, unique_database): |
| self.client.execute("alter database {0} set dbproperties('k'='v')".format( |
| unique_database)) |
| properties = self._get_db_properties(unique_database) |
| assert properties['k'] == 'v' |
| |
| def test_alter_table_set_owner(self, unique_database): |
| table_name = "{0}.test_owner_tbl".format(unique_database) |
| self.client.execute("create table {0}(i int)".format(table_name)) |
| self.client.execute("alter table {0} set owner user foo_user".format(table_name)) |
| owner = self._get_table_or_view_owner(table_name) |
| assert ('foo_user', 'USER') == owner |
| |
| self.client.execute("alter table {0} set owner role foo_role".format(table_name)) |
| owner = self._get_table_or_view_owner(table_name) |
| assert ('foo_role', 'ROLE') == owner |
| |
| def test_alter_view_set_owner(self, unique_database): |
| view_name = "{0}.test_owner_tbl".format(unique_database) |
| self.client.execute("create view {0} as select 1".format(view_name)) |
| self.client.execute("alter view {0} set owner user foo_user".format(view_name)) |
| owner = self._get_table_or_view_owner(view_name) |
| assert ('foo_user', 'USER') == owner |
| |
| self.client.execute("alter view {0} set owner role foo_role".format(view_name)) |
| owner = self._get_table_or_view_owner(view_name) |
| assert ('foo_role', 'ROLE') == owner |
| |
| # There is a query in QueryTest/create-table that references nested types, which is not |
| # supported if old joins and aggs are enabled. Since we do not get any meaningful |
| # additional coverage by running a DDL test under the old aggs and joins, it can be |
| # skipped. |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/create-table', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @SkipIfFS.incorrent_reported_ec |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table_like_table(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/create-table-like-table', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table_like_file(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/create-table-like-file', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @SkipIfHive2.orc |
| @SkipIfFS.hive |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table_like_file_orc(self, vector, unique_database): |
| COMPLEXTYPETBL_PATH = 'test-warehouse/managed/functional_orc_def.db/' \ |
| 'complextypestbl_orc_def/' |
| base_dir = list(filter(lambda s: s.startswith('base'), |
| self.filesystem_client.ls(COMPLEXTYPETBL_PATH)))[0] |
| bucket_file = list(filter(lambda s: s.startswith('bucket'), |
| self.filesystem_client.ls(COMPLEXTYPETBL_PATH + base_dir)))[0] |
| vector.get_value('exec_option')['abort_on_error'] = False |
| create_table_from_orc(self.client, unique_database, |
| 'timestamp_with_local_timezone') |
| self.run_test_case('QueryTest/create-table-like-file-orc', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector), |
| test_file_vars={ |
| '$TRANSACTIONAL_COMPLEXTYPESTBL_FILE': |
| FILESYSTEM_PREFIX + '/' + COMPLEXTYPETBL_PATH + base_dir + '/' + bucket_file}) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_create_table_as_select(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/create-table-as-select', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| @SkipIfKudu.no_hybrid_clock() |
| def test_create_kudu(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT" |
| self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| def test_comment_on_table(self, unique_database): |
| table = '{0}.comment_table'.format(unique_database) |
| self.client.execute("create table {0} (i int)".format(table)) |
| |
| comment = self._get_table_or_view_comment(table) |
| assert comment is None |
| |
| self.client.execute("comment on table {0} is 'comment'".format(table)) |
| comment = self._get_table_or_view_comment(table) |
| assert "comment" == comment |
| |
| self.client.execute("comment on table {0} is ''".format(table)) |
| comment = self._get_table_or_view_comment(table) |
| assert "" == comment |
| |
| self.client.execute("comment on table {0} is '\\'comment\\''".format(table)) |
| comment = self._get_table_or_view_comment(table) |
| assert "\\\\'comment\\\\'" == comment |
| |
| self.client.execute("comment on table {0} is null".format(table)) |
| comment = self._get_table_or_view_comment(table) |
| assert comment is None |
| |
| long_comment = 'a' * 4000 |
| self.client.execute("comment on table {0} is '{1}'".format(table, long_comment)) |
| comment = self._get_table_or_view_comment(table) |
| assert long_comment == comment |
| |
| def test_comment_on_view(self, unique_database): |
| view = '{0}.comment_view'.format(unique_database) |
| self.client.execute("create view {0} as select 1".format(view)) |
| |
| comment = self._get_table_or_view_comment(view) |
| assert comment is None |
| |
| self.client.execute("comment on view {0} is 'comment'".format(view)) |
| comment = self._get_table_or_view_comment(view) |
| assert "comment" == comment |
| |
| self.client.execute("comment on view {0} is ''".format(view)) |
| comment = self._get_table_or_view_comment(view) |
| assert "" == comment |
| |
| self.client.execute("comment on view {0} is '\\'comment\\''".format(view)) |
| comment = self._get_table_or_view_comment(view) |
| assert "\\\\'comment\\\\'" == comment |
| |
| self.client.execute("comment on view {0} is null".format(view)) |
| comment = self._get_table_or_view_comment(view) |
| assert comment is None |
| |
| long_comment = 'a' * 4000 |
| self.client.execute("comment on view {0} is '{1}'".format(view, long_comment)) |
| comment = self._get_table_or_view_comment(view) |
| assert long_comment == comment |
| |
| def test_comment_on_column(self, unique_database): |
| table = "{0}.comment_table".format(unique_database) |
| self.client.execute("create table {0} (i int) partitioned by (j int)".format(table)) |
| |
| comment = self._get_column_comment(table, 'i') |
| assert '' == comment |
| |
| # Updating comment on a regular column. |
| self.client.execute("comment on column {0}.i is 'comment 1'".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "comment 1" == comment |
| |
| # Updating comment on a partition column. |
| self.client.execute("comment on column {0}.j is 'comment 2'".format(table)) |
| comment = self._get_column_comment(table, 'j') |
| assert "comment 2" == comment |
| |
| self.client.execute("comment on column {0}.i is ''".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is '\\'comment\\''".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "\\'comment\\'" == comment |
| |
| self.client.execute("comment on column {0}.i is null".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| view = "{0}.comment_view".format(unique_database) |
| self.client.execute("create view {0}(i) as select 1".format(view)) |
| |
| comment = self._get_column_comment(view, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is 'comment'".format(view)) |
| comment = self._get_column_comment(view, 'i') |
| assert "comment" == comment |
| |
| self.client.execute("comment on column {0}.i is ''".format(view)) |
| comment = self._get_column_comment(view, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is '\\'comment\\''".format(view)) |
| comment = self._get_column_comment(view, 'i') |
| assert "\\'comment\\'" == comment |
| |
| self.client.execute("comment on column {0}.i is null".format(view)) |
| comment = self._get_column_comment(view, 'i') |
| assert "" == comment |
| |
| long_comment = 'a' * 4000 |
| self.client.execute("comment on column {0}.i is '{1}'".format(table, long_comment)) |
| comment = self._get_column_comment(table, 'i') |
| assert long_comment == comment |
| |
| self.client.execute("comment on column {0}.i is '{1}'".format(view, long_comment)) |
| comment = self._get_column_comment(view, 'i') |
| assert long_comment == comment |
| |
| def test_create_table_max_column_comment(self, unique_database): |
| long_comment = 'a' * 4000 |
| table = '{0}.max_comment_tbl'.format(unique_database) |
| self.client.execute( |
| "create table {0} (i int comment '{1}')".format(table, long_comment)) |
| comment = self._get_column_comment(table, 'i') |
| assert long_comment == comment |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_sync_ddl_drop(self, unique_database): |
| """Verifies the catalog gets updated properly when dropping objects with sync_ddl |
| enabled""" |
| self.client.set_configuration({'sync_ddl': 1}) |
| # Drop the database immediately after creation (within a statestore heartbeat) and |
| # verify the catalog gets updated properly. |
| self.client.execute("drop database {0}".format(unique_database)) |
| assert unique_database not in self.all_db_names() |
| # Re-create database to make unique_database teardown succeed. |
| self._create_db(unique_database) |
| |
| # TODO: don't use hdfs_client |
| @SkipIfLocal.hdfs_client |
| @SkipIfFS.incorrent_reported_ec |
| @UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2) |
| def test_alter_table(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| |
| # Create an unpartitioned table to get a filesystem directory that does not |
| # use the (key=value) format. The directory is automatically cleanup up |
| # by the unique_database fixture. |
| self.client.execute("create table {0}.part_data (i int)".format(unique_database)) |
| dbpath = "{1}/{0}.db".format(unique_database, WAREHOUSE) |
| assert self.filesystem_client.exists("{}/part_data".format(dbpath)) |
| self.filesystem_client.create_file( |
| "{}/part_data/data.txt".format(dbpath), file_data='1984') |
| self.run_test_case('QueryTest/alter-table', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @SkipIfFS.hdfs_caching |
| @SkipIfLocal.hdfs_client |
| @UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2) |
| def test_alter_table_hdfs_caching(self, vector, unique_database): |
| self.run_test_case('QueryTest/alter-table-hdfs-caching', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_alter_set_column_stats(self, vector, unique_database): |
| self.run_test_case('QueryTest/alter-table-set-column-stats', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| # Run serially as alter waits for catalog to catch up before marking "DDL finished" so |
| # we don't have a good way to confirm the alter timeline if catalog gets delayed. |
| @pytest.mark.execute_serially |
| def test_alter_table_rename_independent(self, vector, unique_database): |
| """Tests that two alter table renames run concurrently do not block each other.""" |
| |
| def table_name(i): |
| return "{}.tbl_{}".format(unique_database, i) |
| |
| def alter(i, j): |
| return "alter table {} rename to {}".format(table_name(i), table_name(j)) |
| |
| def get_read_lock_duration_ms(profile): |
| read_lock_durations = re.findall(r"Got catalog version read lock: [^ ]*", profile) |
| assert len(read_lock_durations) == 1 |
| return parse_duration_string_ms(read_lock_durations[0].split(" ")[-1]) |
| |
| self.client.execute("create table {} (i int)".format(table_name(1))) |
| self.client.execute("create table {} (i int)".format(table_name(2))) |
| # Ensure loading metadata is not a factor in alter execution time. |
| self.client.execute("describe {}".format(table_name(1))) |
| self.client.execute("describe {}".format(table_name(2))) |
| |
| new_vector = deepcopy(vector) |
| new_vector.get_value('exec_option')['debug_action'] = \ |
| "catalogd_table_rename_delay:SLEEP@5000" |
| with self.create_impala_client_from_vector(new_vector) as client1, \ |
| self.create_impala_client_from_vector(new_vector) as client2: |
| start = time.time() |
| handle1 = client1.execute_async(alter(1, 3)) |
| handle2 = client2.execute_async(alter(2, 4)) |
| assert client1.wait_for_finished_timeout(handle1, timeout=15) |
| assert client2.wait_for_finished_timeout(handle2, timeout=15) |
| assert time.time() - start < 15 |
| |
| profile1 = client1.get_runtime_profile(handle1) |
| assert get_read_lock_duration_ms(profile1) < 5000 |
| profile2 = client2.get_runtime_profile(handle2) |
| assert get_read_lock_duration_ms(profile2) < 5000 |
| |
| client1.close_query(handle1) |
| client2.close_query(handle2) |
| |
| @UniqueDatabase.parametrize(num_dbs=2) |
| def test_concurrent_alter_table_rename(self, vector, unique_database): |
| test_self = self |
| |
| class ThreadLocalClient(threading.local): |
| def __init__(self): |
| self.client = test_self.create_impala_client_from_vector(vector) |
| |
| pool = ThreadPool(processes=8) |
| tlc = ThreadLocalClient() |
| |
| def run_rename(i): |
| if i % 2 == 0: |
| tlc.client.set_configuration_option("sync_ddl", "1") |
| is_partitioned = i % 4 < 2 |
| tbl_name = "{}.tbl_{}".format(unique_database, i) |
| tlc.client.execute("create table {}(i int){}".format( |
| tbl_name, "partitioned by(p int)" if is_partitioned else "")) |
| if i % 8 < 4: |
| # Rename inside the same db |
| new_tbl_name = tbl_name + "_new" |
| else: |
| # Move to another db |
| new_tbl_name = "{}2.tbl_{}".format(unique_database, i) |
| stmts = [ |
| "alter table {} rename to {}".format(tbl_name, new_tbl_name), |
| "alter table {} rename to {}".format(new_tbl_name, tbl_name), |
| ] |
| # Move the table back and forth in several rounds |
| for _ in range(4): |
| for query in stmts: |
| # Run the query asynchronously to avoid getting stuck by it |
| handle = tlc.client.execute_async(query) |
| is_finished = tlc.client.wait_for_finished_timeout(handle, timeout=60) |
| assert is_finished, "Query timeout(60s): " + query |
| tlc.client.close_query(handle) |
| return True |
| |
| # Run renames in parallel |
| NUM_ITERS = 16 |
| worker = [None] * (NUM_ITERS + 1) |
| for i in range(1, NUM_ITERS + 1): |
| worker[i] = pool.apply_async(run_rename, (i,)) |
| for i in range(1, NUM_ITERS + 1): |
| try: |
| assert worker[i].get(timeout=100) |
| except TimeoutError: |
| dump_server_stacktraces() |
| assert False, "Timeout in thread run_ddls(%d)" % i |
| pool.terminate() |
| |
| @SkipIfFS.hbase |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_alter_hbase_set_column_stats(self, vector, unique_database): |
| self.run_test_case('QueryTest/alter-hbase-table-set-column-stats', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @SkipIfLocal.hdfs_client |
| def test_drop_partition_with_purge(self, unique_database): |
| """Verfies whether alter <tbl> drop partition purge actually skips trash""" |
| self.client.execute( |
| "create table {0}.t1(i int) partitioned by (j int)".format(unique_database)) |
| # Add two partitions (j=1) and (j=2) to table t1 |
| self.client.execute("alter table {0}.t1 add partition(j=1)".format(unique_database)) |
| self.client.execute("alter table {0}.t1 add partition(j=2)".format(unique_database)) |
| dbpath = "{1}/{0}.db".format(unique_database, WAREHOUSE) |
| self.filesystem_client.create_file("{}/t1/j=1/j1.txt".format(dbpath), file_data='j1') |
| self.filesystem_client.create_file("{}/t1/j=2/j2.txt".format(dbpath), file_data='j2') |
| # Drop the partition (j=1) without purge and make sure it exists in trash |
| self.client.execute("alter table {0}.t1 drop partition(j=1)".format(unique_database)) |
| assert not self.filesystem_client.exists("{}/t1/j=1/j1.txt".format(dbpath)) |
| assert not self.filesystem_client.exists("{}/t1/j=1".format(dbpath)) |
| trash = get_trash_path("test-warehouse", unique_database + ".db") |
| assert self.filesystem_client.exists('{}/t1/j=1/j1.txt'.format(trash)) |
| assert self.filesystem_client.exists('{}/t1/j=1'.format(trash)) |
| # Drop the partition (with purge) and make sure it doesn't exist in trash |
| self.client.execute("alter table {0}.t1 drop partition(j=2) purge".format( |
| unique_database)) |
| if not IS_S3 and not IS_ADLS: |
| # In S3, deletes are eventual. So even though we dropped the partition, the files |
| # belonging to this partition may still be visible for some unbounded time. This |
| # happens only with PURGE. A regular DROP TABLE is just a copy of files which is |
| # consistent. |
| # The ADLS Python client is not strongly consistent, so these files may still be |
| # visible after a DROP. (Remove after IMPALA-5335 is resolved) |
| assert not self.filesystem_client.exists("{}/t1/j=2/j2.txt".format(dbpath)) |
| assert not self.filesystem_client.exists("{}/t1/j=2".format(dbpath)) |
| assert not self.filesystem_client.exists('{}/t1/j=2/j2.txt'.format(trash)) |
| assert not self.filesystem_client.exists('{}/t1/j=2'.format(trash)) |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_views_ddl(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/views-ddl', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| # IMPALA-13176: Calcite does not support broadcast or shuffle hints yet. |
| @SkipIfCalcite.hints_not_supported |
| @UniqueDatabase.parametrize() |
| def test_view_hints(self, unique_database): |
| # Test that plan hints are stored in the view's comment field; this should work |
| # regardless of how Hive formats the output. Getting this to work with the |
| # automated test case runner is rather difficult, so verify directly. There |
| # should be two # of each join hint, one for the original text, one for the expanded |
| self.client.execute(""" |
| create view {0}.hints_test as |
| select /* +straight_join */ a.* from functional.alltypestiny a |
| inner join /* +broadcast */ functional.alltypes b on a.id = b.id |
| inner join /* +shuffle */ functional.alltypessmall c on b.id = c.id |
| """.format(unique_database)) |
| results = self.execute_query("describe formatted %s.hints_test" % unique_database) |
| sj, bc, shuf = 0, 0, 0 |
| for row in results.data: |
| sj += '-- +straight_join' in row |
| bc += '-- +broadcast' in row |
| shuf += '-- +shuffle' in row |
| assert sj == 2 |
| assert bc == 2 |
| assert shuf == 2 |
| |
| # Test querying the hinted view. |
| results = self.execute_query("select count(*) from %s.hints_test" % unique_database) |
| assert results.success |
| assert len(results.data) == 1 |
| assert results.data[0] == '8' |
| |
| # Test the plan to make sure hints were applied correctly |
| plan = self.execute_query("explain select * from %s.hints_test" % unique_database, |
| query_options={'explain_level': 0}) |
| plan_match = """PLAN-ROOT SINK |
| 08:EXCHANGE [UNPARTITIONED] |
| 04:HASH JOIN [INNER JOIN, PARTITIONED] |
| |--07:EXCHANGE [HASH(c.id)] |
| | 02:SCAN {filesystem_name} [functional.alltypessmall c] |
| 06:EXCHANGE [HASH(b.id)] |
| 03:HASH JOIN [INNER JOIN, BROADCAST] |
| |--05:EXCHANGE [BROADCAST] |
| | 01:SCAN {filesystem_name} [functional.alltypes b] |
| 00:SCAN {filesystem_name} [functional.alltypestiny a]""" |
| assert plan_match.format(filesystem_name=FILESYSTEM_NAME) in '\n'.join(plan.data) |
| |
| def _verify_describe_view(self, vector, view_name, expected_substr): |
| """ |
| Verify across all impalads that the view 'view_name' has the given substring in its |
| expanded SQL. |
| |
| If SYNC_DDL is enabled, the verification should complete immediately. Otherwise, |
| loops waiting for the expected condition to pass. |
| """ |
| if vector.get_value('exec_option')['sync_ddl']: |
| num_attempts = 1 |
| else: |
| num_attempts = 60 |
| for impalad in ImpalaCluster.get_e2e_test_cluster().impalads: |
| client = impalad.service.create_client_from_vector(vector) |
| try: |
| for attempt in itertools.count(1): |
| assert attempt <= num_attempts, "ran out of attempts" |
| try: |
| result = self.execute_query_expect_success( |
| client, "describe formatted %s" % view_name) |
| exp_line = [line for line in result.data if 'View Expanded' in line][0] |
| except IMPALA_CONNECTION_EXCEPTION as e: |
| # In non-SYNC_DDL tests, it's OK to get a "missing view" type error |
| # until the metadata propagates. |
| exp_line = "Exception: %s" % e |
| if expected_substr in exp_line.lower(): |
| return |
| time.sleep(1) |
| finally: |
| client.close() |
| |
| def test_views_describe(self, vector, unique_database): |
| # IMPALA-6896: Tests that altered views can be described by all impalads. |
| impala_cluster = ImpalaCluster.get_e2e_test_cluster() |
| impalads = impala_cluster.impalads |
| view_name = "%s.test_describe_view" % unique_database |
| first_client = impalads[0].service.create_client_from_vector(vector) |
| try: |
| # Create a view and verify it's visible. |
| self.execute_query_expect_success(first_client, |
| "create view {0} as " |
| "select * from functional.alltypes" |
| .format(view_name)) |
| self._verify_describe_view(vector, view_name, "select * from functional.alltypes") |
| |
| # Alter the view and verify the alter is visible. |
| self.execute_query_expect_success(first_client, |
| "alter view {0} as " |
| "select * from functional.alltypesagg" |
| .format(view_name)) |
| self._verify_describe_view(vector, view_name, |
| "select * from functional.alltypesagg") |
| finally: |
| first_client.close() |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_functions_ddl(self, vector, unique_database): |
| self.run_test_case('QueryTest/functions-ddl', vector, use_db=unique_database, |
| multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| @SkipIfLocal.hdfs_client |
| def test_create_alter_bulk_partition(self, unique_database): |
| # Change the scale depending on the exploration strategy, with 50 partitions this |
| # test runs a few minutes, with 10 partitions it takes ~50s for two configurations. |
| num_parts = 50 if self.exploration_strategy() == 'exhaustive' else 10 |
| fq_tbl_name = unique_database + ".part_test_tbl" |
| self.client.execute("create table {0}(i int) partitioned by(j int, s string) " |
| "location '{1}/{0}'".format(fq_tbl_name, WAREHOUSE)) |
| |
| # Add some partitions (first batch of two) |
| for i in range(num_parts // 5): |
| start = time.time() |
| self.client.execute( |
| "alter table {0} add partition(j={1}, s='{1}')".format(fq_tbl_name, i)) |
| LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start)) |
| |
| # Modify one of the partitions |
| self.client.execute("alter table {0} partition(j=1, s='1')" |
| " set fileformat parquetfile".format(fq_tbl_name)) |
| |
| # Alter one partition to a non-existent location twice (IMPALA-741) |
| self.filesystem_client.delete_file_dir("tmp/dont_exist1/", recursive=True) |
| self.filesystem_client.delete_file_dir("tmp/dont_exist2/", recursive=True) |
| |
| self.execute_query_expect_success(self.client, |
| "alter table {0} partition(j=1,s='1') set location '{1}/tmp/dont_exist1'" |
| .format(fq_tbl_name, WAREHOUSE)) |
| self.execute_query_expect_success(self.client, |
| "alter table {0} partition(j=1,s='1') set location '{1}/tmp/dont_exist2'" |
| .format(fq_tbl_name, WAREHOUSE)) |
| |
| # Add some more partitions |
| for i in range(num_parts // 5, num_parts): |
| start = time.time() |
| self.client.execute( |
| "alter table {0} add partition(j={1},s='{1}')".format(fq_tbl_name, i)) |
| LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start)) |
| |
| # Insert data and verify it shows up. |
| self.client.execute( |
| "insert into table {0} partition(j=1, s='1') select 1".format(fq_tbl_name)) |
| assert '1' == self.execute_scalar("select count(*) from {0}".format(fq_tbl_name)) |
| |
| @SkipIfLocal.hdfs_client |
| def test_alter_table_set_fileformat(self, unique_database): |
| # Tests that SET FILEFORMAT clause is set for ALTER TABLE ADD PARTITION statement |
| fq_tbl_name = unique_database + ".p_fileformat" |
| self.client.execute( |
| "create table {0}(i int) partitioned by (p int)".format(fq_tbl_name)) |
| |
| # Add a partition with Parquet fileformat |
| self.execute_query_expect_success(self.client, |
| "alter table {0} add partition(p=1) set fileformat parquet" |
| .format(fq_tbl_name)) |
| |
| # Add two partitions with ORC fileformat |
| self.execute_query_expect_success(self.client, |
| "alter table {0} add partition(p=2) partition(p=3) set fileformat orc" |
| .format(fq_tbl_name)) |
| |
| result = self.execute_query_expect_success(self.client, |
| "SHOW PARTITIONS %s" % fq_tbl_name) |
| |
| assert 1 == len([line for line in result.data if line.find("PARQUET") != -1]) |
| assert 2 == len([line for line in result.data if line.find("ORC") != -1]) |
| |
| def test_alter_table_create_many_partitions(self, unique_database): |
| """ |
| Checks that creating more partitions than the MAX_PARTITION_UPDATES_PER_RPC |
| batch size works, in that it creates all the underlying partitions. |
| """ |
| self.client.execute( |
| "create table {0}.t(i int) partitioned by (p int)".format(unique_database)) |
| MAX_PARTITION_UPDATES_PER_RPC = 500 |
| alter_stmt = "alter table {0}.t add ".format(unique_database) + " ".join( |
| "partition(p=%d)" % (i,) for i in range(MAX_PARTITION_UPDATES_PER_RPC + 2)) |
| self.client.execute(alter_stmt) |
| partitions = self.client.execute("show partitions {0}.t".format(unique_database)) |
| # Show partitions will contain partition HDFS paths, which we expect to contain |
| # "p=val" subdirectories for each partition. The regexp finds all the "p=[0-9]*" |
| # paths, converts them to integers, and checks that wehave all the ones we |
| # expect. |
| PARTITION_RE = re.compile("p=([0-9]+)") |
| assert list(map(int, PARTITION_RE.findall(str(partitions.data)))) == \ |
| list(range(MAX_PARTITION_UPDATES_PER_RPC + 2)) |
| |
| def test_create_alter_tbl_properties(self, unique_database): |
| fq_tbl_name = unique_database + ".test_alter_tbl" |
| |
| # Specify TBLPROPERTIES and SERDEPROPERTIES at CREATE time |
| self.client.execute("""create table {0} (i int) |
| with serdeproperties ('s1'='s2', 's3'='s4') |
| tblproperties ('p1'='v0', 'p1'='v1')""".format(fq_tbl_name)) |
| properties = self._get_tbl_properties(fq_tbl_name) |
| |
| if HIVE_MAJOR_VERSION > 2: |
| assert properties['OBJCAPABILITIES'] == 'EXTREAD,EXTWRITE' |
| assert properties['TRANSLATED_TO_EXTERNAL'] == 'TRUE' |
| assert properties['external.table.purge'] == 'TRUE' |
| assert properties['EXTERNAL'] == 'TRUE' |
| del properties['OBJCAPABILITIES'] |
| del properties['TRANSLATED_TO_EXTERNAL'] |
| del properties['external.table.purge'] |
| del properties['EXTERNAL'] |
| assert len(properties) == 2 |
| # The transient_lastDdlTime is variable, so don't verify the value. |
| assert 'transient_lastDdlTime' in properties |
| del properties['transient_lastDdlTime'] |
| assert {'p1': 'v1'} == properties |
| |
| properties = self._get_serde_properties(fq_tbl_name) |
| assert {'s1': 's2', 's3': 's4'} == properties |
| |
| # Modify the SERDEPROPERTIES using ALTER TABLE SET. |
| self.client.execute("alter table {0} set serdeproperties " |
| "('s1'='new', 's5'='s6')".format(fq_tbl_name)) |
| properties = self._get_serde_properties(fq_tbl_name) |
| assert {'s1': 'new', 's3': 's4', 's5': 's6'} == properties |
| |
| # Modify the TBLPROPERTIES using ALTER TABLE SET. |
| self.client.execute("alter table {0} set tblproperties " |
| "('prop1'='val1', 'p2'='val2', 'p2'='val3', ''='')".format(fq_tbl_name)) |
| properties = self._get_tbl_properties(fq_tbl_name) |
| |
| if HIVE_MAJOR_VERSION > 2: |
| assert 'OBJCAPABILITIES' in properties |
| assert 'transient_lastDdlTime' in properties |
| assert properties['p1'] == 'v1' |
| assert properties['prop1'] == 'val1' |
| assert properties['p2'] == 'val3' |
| assert properties[''] == '' |
| |
| @SkipIfHive2.acid |
| def test_create_insertonly_tbl(self, unique_database): |
| insertonly_tbl = unique_database + ".test_insertonly" |
| self.client.execute("""create table {0} (coli int) stored as parquet tblproperties( |
| 'transactional'='true', 'transactional_properties'='insert_only')""" |
| .format(insertonly_tbl)) |
| properties = self._get_tbl_properties(insertonly_tbl) |
| assert properties['OBJCAPABILITIES'] == 'HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE' |
| |
| def test_alter_tbl_properties_reload(self, unique_database): |
| # IMPALA-8734: Force a table schema reload when setting table properties. |
| tbl_name = "test_tbl" |
| self.execute_query_expect_success(self.client, "create table {0}.{1} (c1 string)" |
| .format(unique_database, tbl_name)) |
| self.filesystem_client.create_file("{2}/{0}.db/{1}/f". |
| format(unique_database, tbl_name, WAREHOUSE), |
| file_data="\nfoo\n") |
| self.execute_query_expect_success(self.client, |
| "alter table {0}.{1} set tblproperties" |
| "('serialization.null.format'='foo')" |
| .format(unique_database, tbl_name)) |
| result = self.execute_query_expect_success(self.client, |
| "select * from {0}.{1}" |
| .format(unique_database, tbl_name)) |
| assert len(result.data) == 2 |
| assert result.data[0] == '' |
| assert result.data[1] == 'NULL' |
| |
| @SkipIfFS.incorrent_reported_ec |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_partition_ddl_predicates(self, vector, unique_database): |
| self.run_test_case('QueryTest/partition-ddl-predicates-all-fs', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| if IS_HDFS: |
| self.run_test_case('QueryTest/partition-ddl-predicates-hdfs-only', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| def test_create_table_file_format(self, unique_database): |
| # When default_file_format query option is not specified, the default table file |
| # format is TEXT. |
| text_table = "{0}.text_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int)".format(text_table)) |
| result = self.execute_query_expect_success( |
| self.client, "show create table {0}".format(text_table)) |
| assert any("TEXTFILE" in x for x in result.data) |
| |
| self.execute_query_expect_failure( |
| self.client, "create table {0}.foobar_tbl".format(unique_database), |
| {"default_file_format": "foobar"}) |
| |
| parquet_table = "{0}.parquet_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int)".format(parquet_table), |
| {"default_file_format": "parquet"}) |
| result = self.execute_query_expect_success( |
| self.client, "show create table {0}".format(parquet_table)) |
| assert any("PARQUET" in x for x in result.data) |
| |
| # The table created should still be ORC even though the default_file_format query |
| # option is set to parquet. |
| orc_table = "{0}.orc_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, |
| "create table {0}(i int) stored as orc".format(orc_table), |
| {"default_file_format": "parquet"}) |
| result = self.execute_query_expect_success( |
| self.client, "show create table {0}".format(orc_table)) |
| assert any("ORC" in x for x in result.data) |
| |
| @SkipIfHive2.acid |
| def test_create_table_transactional_type(self, unique_database): |
| # When default_transactional_type query option is not specified, the transaction |
| # related table properties are not set. |
| non_acid_table = "{0}.non_acid_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int)".format(non_acid_table), |
| {"default_transactional_type": "none"}) |
| props = self._get_properties("Table Parameters", non_acid_table) |
| assert "transactional" not in props |
| assert "transactional_properties" not in props |
| |
| # Create table as "insert_only" transactional. |
| insert_only_acid_table = "{0}.insert_only_acid_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int)".format(insert_only_acid_table), |
| {"default_transactional_type": "insert_only"}) |
| props = self._get_properties("Table Parameters", insert_only_acid_table) |
| assert props["transactional"] == "true" |
| assert props["transactional_properties"] == "insert_only" |
| |
| # default_transactional_type query option should not affect external tables |
| external_table = "{0}.external_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create external table {0}(i int)".format(external_table), |
| {"default_transactional_type": "insert_only"}) |
| props = self._get_properties("Table Parameters", external_table) |
| assert "transactional" not in props |
| assert "transactional_properties" not in props |
| |
| # default_transactional_type query option should not affect Kudu tables. |
| kudu_table = "{0}.kudu_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, |
| "create table {0}(i int primary key) stored as kudu".format(kudu_table), |
| {"default_transactional_type": "insert_only"}) |
| props = self._get_properties("Table Parameters", kudu_table) |
| assert "transactional" not in props |
| assert "transactional_properties" not in props |
| |
| # default_transactional_type query option should have no effect when transactional |
| # table properties are set manually. |
| manual_acid_table = "{0}.manual_acid_tbl".format(unique_database) |
| self.execute_query_expect_success( |
| self.client, "create table {0}(i int) TBLPROPERTIES ('transactional'='false')" |
| .format(manual_acid_table), |
| {"default_transactional_type": "insert_only"}) |
| props = self._get_properties("Table Parameters", manual_acid_table) |
| assert "transactional" not in props |
| assert "transactional_properties" not in props |
| |
| def test_kudu_column_comment(self, unique_database): |
| table = "{0}.kudu_table0".format(unique_database) |
| self.client.execute("create table {0}(x int comment 'x' primary key) \ |
| stored as kudu".format(table)) |
| comment = self._get_column_comment(table, 'x') |
| assert "x" == comment |
| |
| table = "{0}.kudu_table".format(unique_database) |
| self.client.execute("create table {0}(i int primary key) stored as kudu" |
| .format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is 'comment1'".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "comment1" == comment |
| |
| self.client.execute("comment on column {0}.i is ''".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("comment on column {0}.i is 'comment2'".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "comment2" == comment |
| |
| self.client.execute("comment on column {0}.i is null".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("alter table {0} alter column i set comment 'comment3'" |
| .format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "comment3" == comment |
| |
| self.client.execute("alter table {0} alter column i set comment ''".format(table)) |
| comment = self._get_column_comment(table, 'i') |
| assert "" == comment |
| |
| self.client.execute("alter table {0} add columns (j int comment 'comment4')" |
| .format(table)) |
| comment = self._get_column_comment(table, 'j') |
| assert "comment4" == comment |
| |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_describe_materialized_view(self, vector, unique_database): |
| vector.get_value('exec_option')['abort_on_error'] = False |
| self.run_test_case('QueryTest/describe-materialized-view', vector, |
| use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) |
| |
| |
| # IMPALA-10811: RPC to submit query getting stuck for AWS NLB forever |
| # Test HS2, Beeswax and HS2-HTTP three clients. |
| class TestAsyncDDL(TestDdlBase): |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestAsyncDDL, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension()) |
| cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( |
| sync_ddl=[0], disable_codegen_options=[False])) |
| |
| def test_async_ddl(self, vector, unique_database): |
| self.run_test_case('QueryTest/async_ddl', vector, use_db=unique_database) |
| |
| def test_async_ddl_with_JDBC(self, unique_database): |
| self.exec_with_jdbc("drop table if exists {0}.test_table".format(unique_database)) |
| self.exec_with_jdbc_and_compare_result( |
| "create table {0}.test_table(a int)".format(unique_database), |
| "'Table has been created.'") |
| |
| self.exec_with_jdbc("drop table if exists {0}.alltypes_clone".format(unique_database)) |
| self.exec_with_jdbc_and_compare_result( |
| "create table {0}.alltypes_clone as select * from\ |
| functional_parquet.alltypes".format(unique_database), |
| "'Inserted 7300 row(s)'") |
| |
| @classmethod |
| def test_get_operation_status_for_client(self, client, unique_database): |
| # Setup |
| SLEEP_S = 2 |
| client.execute("select count(*) from functional_parquet.alltypes") |
| client.set_configuration_option("enable_async_ddl_execution", "true") |
| client.set_configuration_option("debug_action", |
| "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@%s" % (SLEEP_S * 1000)) |
| # Run the test query which will only compile the DDL in execute_statement() |
| # and measure the time spent. Should be less than 3s. |
| start = time.time() |
| handle = client.execute_async( |
| "create table {0}.alltypes_clone as select sleep({1})".format( |
| unique_database, SLEEP_S * 1000)) |
| end = time.time() |
| assert (end - start <= 3) |
| |
| # The table creation and population part will be done in a separate thread. |
| # The query must be in PENDING state after execute_async and enter RUNNING state |
| # after creating the table in catalogd (at least SLEEP_S delay). The query can |
| # enter FINISHED state after another delay of at least SLEEP_S. |
| assert client.get_impala_exec_state(handle) == PENDING |
| client.wait_for_impala_state(handle, RUNNING, SLEEP_S + 3) |
| client.wait_for_impala_state(handle, FINISHED, SLEEP_S + 3) |
| client.close_query(handle) |
| |
| def test_get_operation_status_for_async_ddl(self, vector, unique_database): |
| """Tests that for an asynchronously executed DDL with delay, GetOperationStatus |
| must be issued repeatedly. Test client hs2-http, hs2 and beeswax""" |
| client = self.default_impala_client(vector.get_value('protocol')) |
| self.test_get_operation_status_for_client(client, unique_database) |
| |
| |
| class TestAsyncDDLTiming(TestDdlBase): |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestAsyncDDLTiming, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension()) |
| cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( |
| sync_ddl=[0], disable_codegen_options=[False])) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('enable_async_ddl_execution', True, False)) |
| |
| def test_alter_table_recover(self, vector, unique_database): |
| enable_async_ddl = vector.get_value('enable_async_ddl_execution') |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| # Setup for the alter table case (create table that points to an existing |
| # location) |
| alltypes_location = get_fs_path("/test-warehouse/alltypes_parquet") |
| source_tbl = "functional_parquet.alltypes" |
| dest_tbl = "{0}.alltypes_clone".format(unique_database) |
| create_table_stmt = 'create external table {0} like {1} location "{2}"'.format( |
| dest_tbl, source_tbl, alltypes_location) |
| self.execute_query_expect_success(client, create_table_stmt) |
| |
| # Describe the table to fetch its metadata |
| self.execute_query_expect_success(client, "describe {0}".format(dest_tbl)) |
| |
| # Configure whether to use async DDL and add appropriate delays |
| new_vector = deepcopy(vector) |
| new_vector.get_value('exec_option')['enable_async_ddl_execution'] = enable_async_ddl |
| new_vector.get_value('exec_option')['debug_action'] = \ |
| "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@10000" |
| exec_start = time.time() |
| alter_stmt = "alter table {0} recover partitions".format(dest_tbl) |
| handle = self.execute_query_async_using_client(client, alter_stmt, new_vector) |
| exec_end = time.time() |
| exec_time = exec_end - exec_start |
| state = client.get_impala_exec_state(handle) |
| if enable_async_ddl: |
| assert state in [PENDING, RUNNING] |
| else: |
| assert state in [RUNNING, FINISHED] |
| |
| # Wait for the statement to finish with a timeout of 20 seconds |
| wait_start = time.time() |
| client.wait_for_impala_state(handle, FINISHED, 20) |
| wait_end = time.time() |
| wait_time = wait_end - wait_start |
| self.close_query_using_client(client, handle) |
| # In sync mode: |
| # The entire DDL is processed in the exec step with delay. exec_time should be |
| # more than 10 seconds. |
| # |
| # In async mode: |
| # The compilation of DDL is processed in the exec step without delay. And the |
| # processing of the DDL plan is in wait step with delay. The wait time should |
| # definitely take more time than 10 seconds. |
| if enable_async_ddl: |
| assert(wait_time >= 10) |
| else: |
| assert(exec_time >= 10) |
| finally: |
| client.close() |
| |
| def test_ctas(self, vector, unique_database): |
| enable_async_ddl = vector.get_value('enable_async_ddl_execution') |
| client = self.create_impala_client(protocol=vector.get_value('protocol')) |
| |
| try: |
| # The CTAS is going to need the metadata of the source table in the |
| # select. To avoid flakiness about metadata loading, this selects from |
| # that source table first to get the metadata loaded. |
| self.execute_query_expect_success(client, |
| "select count(*) from functional_parquet.alltypes") |
| |
| # Configure whether to use async DDL and add appropriate delays |
| new_vector = deepcopy(vector) |
| new_vector.get_value('exec_option')['enable_async_ddl_execution'] = enable_async_ddl |
| create_delay = "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@10000" |
| insert_delay = "CRS_BEFORE_COORD_STARTS:SLEEP@2000" |
| new_vector.get_value('exec_option')['debug_action'] = \ |
| "{0}|{1}".format(create_delay, insert_delay) |
| dest_tbl = "{0}.ctas_test".format(unique_database) |
| source_tbl = "functional_parquet.alltypes" |
| ctas_stmt = 'create external table {0} as select * from {1}'.format( |
| dest_tbl, source_tbl) |
| exec_start = time.time() |
| handle = self.execute_query_async_using_client(client, ctas_stmt, new_vector) |
| exec_end = time.time() |
| exec_time = exec_end - exec_start |
| # The CRS_BEFORE_COORD_STARTS delay postpones the transition from PENDING |
| # to RUNNING, so the sync case should be in PENDING state at the end of |
| # the execute call. This means that the sync and async cases are the same. |
| assert client.is_pending(handle) |
| |
| # Wait for the statement to finish with a timeout of 40 seconds |
| # (60 seconds without shortcircuit reads). There are other tests running |
| # in parallel and ASAN can be slow, so this timeout has been bumped |
| # substantially to avoid flakiness. The actual test case does not depend |
| # on the statement finishing in a particular amount of time. |
| wait_time = 40 if IS_HDFS else 60 |
| wait_start = time.time() |
| client.wait_for_impala_state(handle, FINISHED, wait_time) |
| wait_end = time.time() |
| wait_time = wait_end - wait_start |
| self.close_query_using_client(client, handle) |
| # In sync mode: |
| # The entire CTAS is processed in the exec step with delay. exec_time should be |
| # more than 10 seconds. |
| # |
| # In async mode: |
| # The compilation of CTAS is processed in the exec step without delay. And the |
| # processing of the CTAS plan is in wait step with delay. The wait time should |
| # definitely take more time than 10 seconds. |
| if enable_async_ddl: |
| assert(wait_time >= 10) |
| else: |
| assert(exec_time >= 10) |
| finally: |
| client.close() |
| |
| |
| class TestDdlLogs(TestDdlBase): |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestDdlLogs, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension_from_dict({ |
| 'enable_async_ddl_execution': [True, False]})) |
| |
| @SkipIfDockerizedCluster.daemon_logs_not_exposed |
| def test_error_logs(self, vector, unique_database): |
| query_opts = vector.get_value('exec_option') |
| tbl_name = 'test_async' if query_opts['enable_async_ddl_execution'] else 'test_sync' |
| tbl_name = unique_database + '.' + tbl_name |
| result = self.execute_query_expect_failure( |
| self.client, 'invalidate metadata ' + tbl_name, query_opts) |
| err = "TableNotFoundException: Table not found: " + tbl_name |
| assert err in str(result) |
| self.assert_impalad_log_contains('INFO', err) |
| |
| |
| # IMPALA-2002: Tests repeated adding/dropping of .jar and .so in the lib cache. |
| class TestLibCache(TestDdlBase): |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestLibCache, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) |
| |
| def test_create_drop_function(self, vector, unique_database): |
| """This will create, run, and drop the same function repeatedly, exercising the |
| lib cache mechanism. |
| """ |
| create_fn_stmt = ("create function {0}.f() returns int " |
| "location '{1}/libTestUdfs.so' symbol='NoArgs'" |
| .format(unique_database, WAREHOUSE)) |
| select_stmt = ("select {0}.f() from functional.alltypes limit 10" |
| .format(unique_database)) |
| drop_fn_stmt = "drop function %s {0}.f()".format(unique_database) |
| self.create_drop_ddl(vector, [create_fn_stmt], [drop_fn_stmt], select_stmt) |
| |
| # Run serially because this test inspects global impalad metrics. |
| # TODO: The metrics checks could be relaxed to enable running this test in |
| # parallel, but that might need a more general wait_for_metric_value(). |
| @pytest.mark.execute_serially |
| def test_create_drop_data_src(self, vector, unique_database): |
| """This will create, run, and drop the same data source repeatedly, exercising |
| the lib cache mechanism. |
| """ |
| data_src_name = unique_database + "_datasrc" |
| create_ds_stmt = ("CREATE DATA SOURCE {0} " |
| "LOCATION '{1}/data-sources/test-data-source.jar' " |
| "CLASS 'org.apache.impala.extdatasource.AllTypesDataSource' " |
| "API_VERSION 'V1'".format(data_src_name, WAREHOUSE)) |
| create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) " |
| "PRODUCED BY DATA SOURCE {1}('dummy_init_string')")\ |
| .format(unique_database, data_src_name) |
| drop_ds_stmt = "drop data source %s {0}".format(data_src_name) |
| drop_tbl_stmt = "drop table %s {0}.data_src_tbl".format(unique_database) |
| select_stmt = "select * from {0}.data_src_tbl limit 1".format(unique_database) |
| class_cache_hits_metric = "external-data-source.class-cache.hits" |
| class_cache_misses_metric = "external-data-source.class-cache.misses" |
| |
| create_stmts = [create_ds_stmt, create_tbl_stmt] |
| drop_stmts = [drop_tbl_stmt, drop_ds_stmt] |
| |
| # The ImpaladService is used to capture metrics |
| service = self.impalad_test_service |
| |
| # Initial metric values |
| class_cache_hits = service.get_metric_value(class_cache_hits_metric) |
| class_cache_misses = service.get_metric_value(class_cache_misses_metric) |
| # Test with 1 node so we can check the metrics on only the coordinator |
| vector.get_value('exec_option')['num_nodes'] = 1 |
| num_iterations = 2 |
| self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, num_iterations) |
| |
| # Check class cache metrics. Shouldn't have any new cache hits, there should be |
| # 2 cache misses for every iteration (jar is loaded by both the FE and BE). |
| expected_cache_misses = class_cache_misses + (num_iterations * 2) |
| service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits) |
| service.wait_for_metric_value(class_cache_misses_metric, |
| expected_cache_misses) |
| |
| # Test with a table that caches the class |
| create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) " |
| "PRODUCED BY DATA SOURCE {1}('CACHE_CLASS::dummy_init_string')")\ |
| .format(unique_database, data_src_name) |
| create_stmts = [create_ds_stmt, create_tbl_stmt] |
| # Run once before capturing metrics because the class already may be cached from |
| # a previous test run. |
| # TODO: Provide a way to clear the cache |
| self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1) |
| |
| # Capture metric values and run again, should hit the cache. |
| class_cache_hits = service.get_metric_value(class_cache_hits_metric) |
| class_cache_misses = service.get_metric_value(class_cache_misses_metric) |
| self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1) |
| service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits + 2) |
| service.wait_for_metric_value(class_cache_misses_metric, class_cache_misses) |
| |
| def create_drop_ddl(self, vector, create_stmts, drop_stmts, select_stmt, |
| num_iterations=3): |
| """Helper method to run CREATE/DROP DDL commands repeatedly and exercise the lib |
| cache. create_stmts is the list of CREATE statements to be executed in order |
| drop_stmts is the list of DROP statements to be executed in order. Each statement |
| should have a '%s' placeholder to insert "IF EXISTS" or "". The select_stmt is just a |
| single statement to test after executing the CREATE statements. |
| TODO: it's hard to tell that the cache is working (i.e. if it did nothing to drop |
| the cache, these tests would still pass). Testing that is a bit harder and requires |
| us to update the udf binary in the middle. |
| """ |
| self.client.set_configuration(vector.get_value("exec_option")) |
| for drop_stmt in drop_stmts: self.client.execute(drop_stmt % ("if exists")) |
| for i in range(0, num_iterations): |
| for create_stmt in create_stmts: self.client.execute(create_stmt) |
| self.client.execute(select_stmt) |
| for drop_stmt in drop_stmts: self.client.execute(drop_stmt % ("")) |