| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from subprocess import check_call |
| import logging |
| import pytest |
| import re |
| import time |
| import threading |
| |
| from tests.common.test_dimensions import ( |
| create_single_exec_option_dimension, |
| add_mandatory_exec_option) |
| from tests.common.impala_cluster import ImpalaCluster |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.skip import SkipIfFS, SkipIfHive2, SkipIfCatalogV2 |
| from tests.common.test_vector import HS2 |
| from tests.metadata.test_event_processing_base import TestEventProcessingBase |
| from tests.util.event_processor_utils import EventProcessorUtils |
| |
| PROCESSING_TIMEOUT_S = 30 |
| LOG = logging.getLogger(__name__) |
| |
| @SkipIfFS.hive |
| class TestEventProcessing(TestEventProcessingBase): |
| """This class contains tests that exercise the event processing mechanism in the |
| catalog.""" |
| |
| @classmethod |
| def setup_class(cls): |
| super(TestEventProcessing, cls).setup_class() |
| |
| @classmethod |
| def default_test_protocol(cls): |
| return HS2 |
| |
| @SkipIfHive2.acid |
| def test_transactional_insert_events(self, unique_database): |
| """Executes 'run_test_insert_events' for transactional tables. |
| """ |
| TestEventProcessingBase._run_test_insert_events_impl(self, |
| unique_database, is_transactional=True) |
| |
| def test_insert_events(self, unique_database): |
| """Executes 'run_test_insert_events' for non-transactional tables. |
| """ |
| TestEventProcessingBase._run_test_insert_events_impl(self, unique_database) |
| |
| def test_iceberg_inserts(self): |
| """IMPALA-10735: INSERT INTO Iceberg table fails during INSERT event generation |
| This test doesn't test event processing. It tests that Iceberg INSERTs still work |
| when HMS event polling is enabled. |
| IMPALA-10736 tracks adding proper support for Hive Replication.""" |
| db_name = ImpalaTestSuite.get_random_name("iceberg_insert_event_db_") |
| tbl_name = "ice_test" |
| try: |
| self.execute_query("create database if not exists {0}".format(db_name)) |
| self.execute_query(""" |
| create table {0}.{1} (i int) |
| partitioned by spec (bucket(5, i)) |
| stored as iceberg;""".format(db_name, tbl_name)) |
| self.execute_query("insert into {0}.{1} values (1)".format(db_name, tbl_name)) |
| data = self.execute_scalar("select * from {0}.{1}".format(db_name, tbl_name)) |
| assert data == '1' |
| finally: |
| self.execute_query("drop database if exists {0} cascade".format(db_name)) |
| |
| @pytest.mark.execute_serially |
| def test_hive_impala_iceberg_reloads(self, unique_database): |
| def get_refresh_count(): |
| return EventProcessorUtils.get_int_metric('tables-refreshed', 0) |
| |
| def run_hive_check_refresh(stmt): |
| refresh_before = get_refresh_count() |
| self.run_stmt_in_hive(stmt) |
| EventProcessorUtils.wait_for_event_processing(self) |
| refresh_after = get_refresh_count() |
| assert refresh_after - refresh_before == 1 |
| |
| test_tbl = unique_database + ".test_events" |
| self.run_stmt_in_hive("create table {} (value string) \ |
| partitioned by (year int) stored by iceberg".format(test_tbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| self.execute_query("describe {}".format(test_tbl)) |
| |
| run_hive_check_refresh("insert into {} values ('1', 2025)".format(test_tbl)) |
| |
| res = self.execute_query("select * from {}".format(test_tbl)) |
| assert ["1\t2025"] == res.data |
| res = self.execute_query("refresh {}".format(test_tbl)) |
| assert "Iceberg table reload skipped as no change detected" in res.runtime_profile |
| |
| run_hive_check_refresh("alter table {} add columns (s string)".format(test_tbl)) |
| |
| res = self.execute_query("select * from {}".format(test_tbl)) |
| assert ["1\t2025\tNULL"] == res.data |
| res = self.execute_query("refresh {}".format(test_tbl)) |
| assert "Iceberg table reload skipped as no change detected" in res.runtime_profile |
| |
| @SkipIfHive2.acid |
| def test_empty_partition_events_transactional(self, unique_database): |
| self._run_test_empty_partition_events(unique_database, True) |
| |
| def test_empty_partition_events(self, unique_database): |
| self._run_test_empty_partition_events(unique_database, False) |
| |
| def test_event_based_replication(self): |
| self._run_event_based_replication_tests_impl(self, |
| self.filesystem_client) |
| |
| def _run_test_empty_partition_events(self, unique_database, is_transactional): |
| test_tbl = unique_database + ".test_events" |
| TBLPROPERTIES = self._get_transactional_tblproperties(is_transactional) |
| self.run_stmt_in_hive("create table {0} (key string, value string) \ |
| partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES)) |
| self.client.set_configuration({ |
| "sync_hms_events_wait_time_s": PROCESSING_TIMEOUT_S, |
| "sync_hms_events_strict_mode": True |
| }) |
| self.client.execute("describe {0}".format(test_tbl)) |
| |
| self.run_stmt_in_hive( |
| "alter table {0} add partition (year=2019)".format(test_tbl)) |
| assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year') |
| |
| self.run_stmt_in_hive( |
| "alter table {0} add if not exists partition (year=2019)".format(test_tbl)) |
| assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year') |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| self.run_stmt_in_hive( |
| "alter table {0} drop partition (year=2019)".format(test_tbl)) |
| assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year') |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| self.run_stmt_in_hive( |
| "alter table {0} drop if exists partition (year=2019)".format(test_tbl)) |
| assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year') |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| @pytest.mark.execute_serially |
| def test_load_data_from_impala(self, unique_database): |
| tbl_nopart = "tbl_nopart" |
| tbl_part = "tbl_part" |
| staging_dir = "/tmp/{0}".format(unique_database) |
| check_call(["hdfs", "dfs", "-mkdir", staging_dir]) |
| try: |
| self.execute_query( |
| "drop table if exists {0}.{1} purge".format(unique_database, tbl_nopart)) |
| self.execute_query( |
| "drop table if exists {0}.{1} purge".format(unique_database, tbl_part)) |
| |
| self.execute_query( |
| "create table {0}.{1} like functional_parquet.tinytable stored as parquet" |
| .format(unique_database, tbl_nopart)) |
| self.execute_query( |
| "create table {0}.{1} like functional_parquet.alltypessmall stored as \ |
| parquet".format(unique_database, tbl_part)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| |
| check_call([ |
| "hdfs", "dfs", "-cp", "/test-warehouse/tinytable_parquet", staging_dir]) |
| last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client) |
| self.execute_query("load data inpath '{0}/tinytable_parquet' \ |
| into table {1}.{2}".format(staging_dir, unique_database, tbl_nopart)) |
| # Check if there is an insert event fired after load data statement. |
| events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id) |
| assert len(events) == 1 |
| last_event = events[0] |
| assert last_event.dbName == unique_database |
| assert last_event.tableName == tbl_nopart |
| assert last_event.eventType == "INSERT" |
| |
| check_call(["hdfs", "dfs", "-cp", "/test-warehouse/alltypessmall_parquet", |
| staging_dir]) |
| self.execute_query( |
| "alter table {0}.{1} add partition (year=2009,month=1)".format( |
| unique_database, tbl_part)) |
| last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client) |
| self.execute_query( |
| "load data inpath '{0}/alltypessmall_parquet/year=2009/month=1' \ |
| into table {1}.{2} partition (year=2009,month=1)".format( |
| staging_dir, unique_database, tbl_part)) |
| # Check if there is an insert event fired after load data statement. |
| events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id) |
| assert len(events) == 1 |
| last_event = events[0] |
| assert last_event.dbName == unique_database |
| assert last_event.tableName == tbl_part |
| assert last_event.eventType == "INSERT" |
| finally: |
| check_call(["hdfs", "dfs", "-rm", "-r", "-skipTrash", staging_dir]) |
| |
| def test_transact_partition_location_change_from_hive(self, unique_database): |
| """IMPALA-12356: Verify alter partition from hive on transactional table""" |
| self.run_test_partition_location_change_from_hive(unique_database, |
| "transact_alter_part_hive", True) |
| |
| def test_partition_location_change_from_hive(self, unique_database): |
| """IMPALA-12356: Verify alter partition from hive on non-transactional table""" |
| self.run_test_partition_location_change_from_hive(unique_database, "alter_part_hive") |
| |
| def run_test_partition_location_change_from_hive(self, unique_database, tbl_name, |
| is_transactional=False): |
| fq_tbl_name = unique_database + "." + tbl_name |
| TBLPROPERTIES = TestEventProcessingBase._get_transactional_tblproperties( |
| is_transactional) |
| # Create the table |
| self.client.execute( |
| "create table %s (i int) partitioned by(j int) stored as parquet %s" |
| % (fq_tbl_name, TBLPROPERTIES)) |
| # Insert some data to a partition |
| p1 = "j=1" |
| self.client.execute("insert into table %s partition(%s) values (0),(1),(2)" |
| % (fq_tbl_name, p1)) |
| tbl_location = self._get_table_property("Location:", fq_tbl_name) |
| partitions = self.get_impala_partition_info(fq_tbl_name, 'Location') |
| assert [("{0}/{1}".format(tbl_location, p1),)] == partitions |
| # Alter partition location from hive |
| new_part_location = tbl_location + "/j=2" |
| self.run_stmt_in_hive("alter table %s partition(%s) set location '%s'" |
| % (fq_tbl_name, p1, new_part_location)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| # Verify if the location is updated |
| partitions = self.get_impala_partition_info(fq_tbl_name, 'Location') |
| assert [(new_part_location,)] == partitions |
| |
| def _get_table_property(self, property_name, table_name): |
| """Extract the table property value from output of DESCRIBE FORMATTED.""" |
| result = self.client.execute("describe formatted {0}".format(table_name)) |
| for row in result.data: |
| if property_name in row: |
| row = row.split('\t') |
| if row[1] == 'NULL': |
| break |
| return row[1].rstrip() |
| return None |
| |
| def _exec_and_check_ep_cmd(self, cmd, expected_status): |
| cmd_output = self.execute_query(cmd).get_data() |
| match = re.search( |
| r"EventProcessor status: %s. LastSyncedEventId: \d+. LatestEventId: \d+." % |
| expected_status, |
| cmd_output) |
| assert match, cmd_output |
| assert EventProcessorUtils.get_event_processor_status() == expected_status |
| return cmd_output |
| |
| @pytest.mark.execute_serially |
| def test_event_processor_cmds(self, unique_database): |
| ########################################################################### |
| # 1. Test normal PAUSE and RESUME. Also check the STATUS command. |
| self._exec_and_check_ep_cmd(":event_processor('pause')", "PAUSED") |
| self._exec_and_check_ep_cmd(":event_processor('status')", "PAUSED") |
| self._exec_and_check_ep_cmd(":event_processor('start')", "ACTIVE") |
| self._exec_and_check_ep_cmd(":event_processor('status')", "ACTIVE") |
| |
| # Make sure the CREATE_DATABASE event for 'unique_database' is processed |
| EventProcessorUtils.wait_for_event_processing(self) |
| |
| ########################################################################### |
| # 2. Test failure of restarting at an older event id when status is ACTIVE |
| last_synced_event_id = EventProcessorUtils.get_last_synced_event_id() |
| e = self.execute_query_expect_failure( |
| self.client, ":event_processor('start', %d)" % (last_synced_event_id / 2)) |
| assert "EventProcessor is active. Failed to set last synced event id from " +\ |
| str(last_synced_event_id) + " back to " + str(int(last_synced_event_id / 2)) +\ |
| ". Please pause EventProcessor first." in str(e) |
| |
| ########################################################################### |
| # 3. Test restarting to the latest event id |
| self._exec_and_check_ep_cmd(":event_processor('pause')", "PAUSED") |
| # Create some HMS events |
| for i in range(3): |
| self.run_stmt_in_hive("create table %s.tbl_%d(i int)" % (unique_database, i)) |
| latest_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client) |
| # Wait some time for EP to update its latest event id |
| time.sleep(2) |
| # Restart to the latest event id |
| self._exec_and_check_ep_cmd(":event_processor('start', -1)", "ACTIVE") |
| assert EventProcessorUtils.get_last_synced_event_id() == latest_event_id |
| # Verify the new events are skipped so Impala queries should fail |
| for i in range(3): |
| self.execute_query_expect_failure( |
| self.client, "describe %s.tbl_%d" % (unique_database, i)) |
| |
| ########################################################################### |
| # 4. Test setting back the last synced event id after pausing EP |
| self._exec_and_check_ep_cmd(":event_processor('pause')", "PAUSED") |
| # Restart to the previous last synced event id to process the missing HMS events |
| self._exec_and_check_ep_cmd( |
| ":event_processor('start', %d)" % last_synced_event_id, "ACTIVE") |
| EventProcessorUtils.wait_for_event_processing(self) |
| # Tables should be visible now |
| for i in range(3): |
| self.execute_query_expect_success( |
| self.client, "describe %s.tbl_%d" % (unique_database, i)) |
| |
| ########################################################################### |
| # 5. Test unknown commands |
| e = self.execute_query_expect_failure(self.client, ":event_processor('bad_cmd')") |
| assert "Unknown command: BAD_CMD. Supported commands: PAUSE, START, STATUS" in str(e) |
| |
| ########################################################################### |
| # 6. Test illegal event id |
| e = self.execute_query_expect_failure(self.client, ":event_processor('start', -2)") |
| assert "Illegal event id -2. Should be >= -1" in str(e) |
| |
| ########################################################################### |
| # 7. Test restarting on a future event id |
| cmd_output = self._exec_and_check_ep_cmd( |
| ":event_processor('start', %d)" % (latest_event_id + 2), "ACTIVE") |
| warning = ("Target event id %d is larger than the latest event id %d. Some future " |
| "events will be skipped.") % (latest_event_id + 2, latest_event_id) |
| assert warning in cmd_output |
| # The cleanup method will drop 'unique_database' and tables in it, which generates |
| # more than 2 self-events. It's OK for EP to skip them. |
| |
| |
| @SkipIfFS.hive |
| class TestEventSyncWaiting(ImpalaTestSuite): |
| """Verify query option sync_hms_events_wait_time_s should protect the query by |
| waiting until Impala sync the HMS changes.""" |
| |
| @classmethod |
| def get_workload(cls): |
| return 'functional-planner' |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestEventSyncWaiting, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) |
| add_mandatory_exec_option(cls, 'sync_hms_events_wait_time_s', PROCESSING_TIMEOUT_S) |
| add_mandatory_exec_option(cls, 'sync_hms_events_strict_mode', True) |
| |
| @pytest.mark.execute_serially |
| def test_event_processor_pauses(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| tbl = unique_database + ".foo" |
| |
| # Create a table in Hive and submit a query on it when EP is paused. |
| client.execute(":event_processor('pause')") |
| self.run_stmt_in_hive("create table {} as select 1".format(tbl)) |
| |
| # execute_async() is not really async that it returns after query planning finishes. |
| # So we use execute_query_expect_success here and resume EP in a background thread. |
| def resume_event_processor(): |
| time.sleep(2) |
| client = self.create_impala_client_from_vector(vector) |
| client.execute(":event_processor('start')") |
| resume_ep_thread = threading.Thread(target=resume_event_processor) |
| resume_ep_thread.start() |
| res = self.execute_query_expect_success(client, "select * from " + tbl) |
| assert res.data == ['1'] |
| |
| def test_describe(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| tbl_name = unique_database + ".tbl" |
| # Test DESCRIBE on new table created in Hive |
| self.run_stmt_in_hive( |
| "create table {0} (i int) partitioned by (p int)".format(tbl_name)) |
| res = self.execute_query_expect_success(client, "describe " + tbl_name) |
| assert res.data == ["i\tint\t", 'p\tint\t'] |
| assert res.log == '' |
| self.__verify_profile_timeline(res.runtime_profile) |
| |
| def test_show_tables(self, vector, unique_database): |
| # Test SHOW TABLES gets new tables created in Hive |
| client = self.create_impala_client_from_vector(vector) |
| tbl_name = unique_database + ".tbl" |
| self.run_stmt_in_hive("create table {0} (i int)".format(tbl_name)) |
| res = self.execute_query_expect_success(client, "show tables in " + unique_database) |
| assert res.data == ["tbl"] |
| assert res.log == '' |
| self.__verify_profile_timeline(res.runtime_profile) |
| |
| # Test SHOW VIEWS gets new views created in Hive |
| self.run_stmt_in_hive( |
| "create view {0}.v as select * from {1}".format(unique_database, tbl_name)) |
| res = self.execute_query_expect_success(client, "show views in " + unique_database) |
| assert res.data == ["v"] |
| assert res.log == '' |
| self.__verify_profile_timeline(res.runtime_profile) |
| |
| def test_drop_created_table(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| self.run_stmt_in_hive("create table {0}.tbl(i int)".format(unique_database)) |
| self.execute_query_expect_success( |
| client, "drop table {0}.tbl".format(unique_database)) |
| |
| def test_insert_under_missing_db(self, vector, unique_name): |
| client = self.create_impala_client_from_vector(vector) |
| db = unique_name + "_db" |
| try: |
| # Create the table in Hive immediately after creating the db. So it's more likely |
| # that when the INSERT is submitted, the db is still missing in Impala. |
| self.run_stmt_in_hive("""create database {0}; |
| create table {0}.tbl(i int)""".format(db)) |
| self.execute_query_expect_success( |
| client, "insert into {0}.tbl values (0)".format(db)) |
| finally: |
| self.run_stmt_in_hive( |
| "drop database if exists {0} cascade".format(db)) |
| |
| def test_show_databases(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| res = self.execute_query_expect_success(client, "show databases") |
| assert unique_database + "\t" in res.data |
| assert unique_database + "_2\t" not in res.data |
| self.__verify_profile_timeline(res.runtime_profile) |
| |
| # Create a new db in Hive |
| self.run_stmt_in_hive("create database {0}_2".format(unique_database)) |
| res = self.execute_query_expect_success(client, "show databases") |
| assert unique_database + "\t" in res.data |
| assert unique_database + "_2\t" in res.data |
| self.__verify_profile_timeline(res.runtime_profile) |
| |
| # Drop the new db in Hive |
| self.run_stmt_in_hive("drop database {0}_2".format(unique_database)) |
| res = self.execute_query_expect_success(client, "show databases") |
| assert unique_database + "\t" in res.data |
| assert unique_database + "_2\t" not in res.data |
| self.__verify_profile_timeline(res.runtime_profile) |
| |
| def test_drop_db(self, vector, unique_name): |
| client = self.create_impala_client_from_vector(vector) |
| db = unique_name + "_db" |
| try: |
| self.run_stmt_in_hive("create database {0}".format(db)) |
| self.execute_query_expect_success(client, "drop database {0}".format(db)) |
| finally: |
| self.run_stmt_in_hive( |
| "drop database if exists {0} cascade".format(db)) |
| |
| def test_describe_db(self, vector, unique_name): |
| client = self.create_impala_client_from_vector(vector) |
| db = unique_name + "_db" |
| try: |
| self.run_stmt_in_hive("create database {0}".format(db)) |
| self.execute_query_expect_success( |
| client, "describe database {0}".format(db)) |
| finally: |
| self.run_stmt_in_hive("drop database if exists {0}".format(db)) |
| |
| def test_show_functions(self, vector, unique_name): |
| client = self.create_impala_client_from_vector(vector) |
| db = unique_name + "_db" |
| try: |
| self.run_stmt_in_hive("create database {0}".format(db)) |
| self.execute_query_expect_success( |
| client, "show functions in {0}".format(db)) |
| finally: |
| self.run_stmt_in_hive("drop database if exists {0}".format(db)) |
| |
| def test_hive_insert(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| # Create a partitioned table and DESCRIBE it to make it loaded. |
| tbl_name = unique_database + ".tbl" |
| self.execute_query("create table {0}(i int) partitioned by(p int)".format(tbl_name)) |
| self.execute_query("describe " + tbl_name) |
| # Test SELECT gets new values inserted by Hive |
| self.run_stmt_in_hive( |
| "insert into table {0} partition (p=0) select 0".format(tbl_name)) |
| res = self.execute_query_expect_success(client, "select * from " + tbl_name) |
| assert res.data == ["0\t0"] |
| assert res.log == '' |
| self.__verify_profile_timeline(res.runtime_profile) |
| # Same case but using INSERT OVERWRITE in Hive |
| self.run_stmt_in_hive( |
| "insert overwrite table {0} partition (p=0) select 1".format(tbl_name)) |
| res = self.execute_query_expect_success(client, "select * from " + tbl_name) |
| assert res.data == ["1\t0"] |
| assert res.log == '' |
| self.__verify_profile_timeline(res.runtime_profile) |
| |
| # Test SHOW PARTITIONS gets new partitions created by Hive |
| self.run_stmt_in_hive( |
| "insert into table {0} partition (p=2) select 2".format(tbl_name)) |
| res = self.execute_query_expect_success(client, "show partitions " + tbl_name) |
| assert self.has_value('p=0', res.data) |
| assert self.has_value('p=2', res.data) |
| # 3 result lines: 2 for partitions, 1 for total info |
| assert len(res.data) == 3 |
| assert res.log == '' |
| self.__verify_profile_timeline(res.runtime_profile) |
| |
| def test_create_dropped_db(self, vector, unique_name): |
| """Test CREATE DATABASE on db dropped by Hive. |
| Use unique_name instead of unique_database to avoid cleanup failure overwriting |
| the real test failure, i.e. when the test fails, the db probably doesn't exist |
| so cleanup of unique_database will also fail.""" |
| client = self.create_impala_client_from_vector(vector) |
| db = unique_name + "_db" |
| self.execute_query("create database " + db) |
| try: |
| self.run_stmt_in_hive("drop database " + db) |
| res = self.execute_query_expect_success(client, "create database " + db) |
| self.__verify_profile_timeline(res.runtime_profile) |
| finally: |
| self.execute_query("drop database if exists {} cascade".format(db)) |
| |
| def test_create_dropped_table(self, vector, unique_database): |
| """Test CREATE TABLE on table dropped by Hive""" |
| client = self.create_impala_client_from_vector(vector) |
| # Create a table and DESCRIBE it to make it loaded |
| tbl_name = unique_database + ".tbl" |
| self.execute_query_expect_success(client, "create table {0} (i int)".format(tbl_name)) |
| res = self.execute_query_expect_success(client, "describe " + tbl_name) |
| assert res.data == ["i\tint\t"] |
| # Drop it in Hive and re-create it in Impala using a new schema |
| self.run_stmt_in_hive("drop table " + tbl_name) |
| self.execute_query_expect_success(client, "create table {0} (j int)".format(tbl_name)) |
| res = self.execute_query_expect_success(client, "describe " + tbl_name) |
| assert res.data == ["j\tint\t"] |
| assert res.log == '' |
| self.__verify_profile_timeline(res.runtime_profile) |
| |
| def __verify_profile_timeline(self, profile): |
| self.verify_timeline_item( |
| "Query Compilation", "Synced events from Metastore", profile) |
| |
| def test_multiple_tables(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| for i in range(3): |
| self.execute_query("create table {0}.tbl{1} (i int)".format(unique_database, i)) |
| res = self.execute_query_expect_success(client, """ |
| select t1.i from {0}.tbl0 t0, {0}.tbl1 t1, {0}.tbl2 t2 |
| where t0.i = t1.i and t1.i = t2.i""".format(unique_database)) |
| assert len(res.data) == 0 |
| |
| for i in range(3): |
| self.run_stmt_in_hive("insert into table {0}.tbl{1} select 1".format( |
| unique_database, i)) |
| res = self.execute_scalar_expect_success(client, """ |
| select t1.i from {0}.tbl0 t0, {0}.tbl1 t1, {0}.tbl2 t2 |
| where t0.i = t1.i and t1.i = t2.i""".format(unique_database)) |
| assert res == "1" |
| |
| def test_view(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| tbl = unique_database + ".foo" |
| view = unique_database + ".foo_view" |
| count_stmt = "select count(*) from {}".format(view) |
| self.execute_query("create table {}(i int)".format(tbl)) |
| self.execute_query("create view {} as select * from {}".format(view, tbl)) |
| # Run a query to make the metadata loaded so they can be stale later. |
| res = self.execute_scalar(count_stmt) |
| assert res == '0' |
| |
| # Modify the table in Hive and read the view in Impala |
| self.run_stmt_in_hive("insert into {} select 1".format(tbl)) |
| res = self.execute_query_expect_success(client, count_stmt) |
| assert res.data[0] == '1' |
| |
| # Modify the view in Hive and read it in Impala |
| self.run_stmt_in_hive( |
| "alter view {} as select * from {} where i > 1".format(view, tbl)) |
| res = self.execute_query_expect_success(client, count_stmt) |
| assert res.data[0] == '0' |
| |
| def test_view_partitioned(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| tbl = unique_database + ".foo" |
| view = unique_database + ".foo_view" |
| select_stmt = "select * from " + view |
| self.execute_query("create table {}(i int) partitioned by(p int)".format(tbl)) |
| self.execute_query("create view {} as select * from {} where p>0".format(view, tbl)) |
| res = self.execute_query_expect_success(client, select_stmt) |
| assert len(res.data) == 0 |
| |
| # Ingest data in Hive and read the view in Impala |
| # Add a new partition that will be filtered out by the view |
| self.run_stmt_in_hive("insert into {} select 0, 0".format(tbl)) |
| res = self.execute_query_expect_success(client, select_stmt) |
| assert len(res.data) == 0 |
| # Add a new partition that will show up in the view |
| self.run_stmt_in_hive("insert into {} select 1, 1".format(tbl)) |
| res = self.execute_scalar_expect_success(client, select_stmt) |
| assert res == '1\t1' |
| # Add a new partition and alter the view to only show it |
| self.run_stmt_in_hive("insert into {} select 2, 2".format(tbl)) |
| self.run_stmt_in_hive("alter view {} as select * from {} where p>1".format(view, tbl)) |
| res = self.execute_scalar_expect_success(client, select_stmt) |
| assert res == '2\t2' |
| |
| def test_compute_stats(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| tbl = unique_database + ".foo" |
| self.execute_query("create table {}(i int) partitioned by(p int)".format(tbl)) |
| # Add one partition in Hive and compute incremental stats on that partition in Impala |
| self.run_stmt_in_hive("insert into {} select 0,0".format(tbl)) |
| res = self.execute_query_expect_success( |
| client, "compute incremental stats {} partition(p=0)".format(tbl)) |
| assert res.data == ['Updated 1 partition(s) and 1 column(s).'] |
| # Add one partition in Hive and compute incremental stats on that table in Impala |
| self.run_stmt_in_hive("insert into {} select 1,1 union all select 2,2".format(tbl)) |
| res = self.execute_query_expect_success( |
| client, "compute incremental stats {}".format(tbl)) |
| assert res.data == ['Updated 2 partition(s) and 1 column(s).'] |
| # Drop two partitions in Hive and compute stats on that table in Impala. The |
| # incremental stats will be replaced with non-incremental stats so the remaining |
| # partition is updated. |
| self.run_stmt_in_hive("alter table {} drop partition(p<2)".format(tbl)) |
| res = self.execute_query_expect_success( |
| client, "compute stats {}".format(tbl)) |
| assert res.data == ['Updated 1 partition(s) and 1 column(s).'] |
| |
| def test_ctas(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| tbl = unique_database + ".foo" |
| tmp_tbl = unique_database + ".tmp" |
| self.execute_query("create table {}(i int) partitioned by(p int)".format(tbl)) |
| # Add one partition in Hive and use the table in Impala |
| self.run_stmt_in_hive("insert into {} select 0,0".format(tbl)) |
| res = self.execute_query_expect_success( |
| client, "create table {} as select * from {}".format(tmp_tbl, tbl)) |
| assert res.data == ['Inserted 1 row(s)'] |
| # Insert one row into the same partition in Hive and use the table in Impala |
| self.run_stmt_in_hive("insert into {} select 1,0".format(tbl)) |
| res = self.execute_query_expect_success( |
| client, "create table {}_2 as select * from {}".format(tmp_tbl, tbl)) |
| assert res.data == ['Inserted 2 row(s)'] |
| # Truncate the table in Hive and use it in Impala |
| self.run_stmt_in_hive("truncate table {}".format(tbl)) |
| res = self.execute_query_expect_success( |
| client, "create table {}_3 as select * from {}".format(tmp_tbl, tbl)) |
| assert res.data == ['Inserted 0 row(s)'] |
| |
| # Create a table in Hive before CTAS of it in Impala |
| self.run_stmt_in_hive("create table {}_4(i int) partitioned by(p int)".format(tbl)) |
| exception = self.execute_query_expect_failure( |
| client, "create table {}_4 as select 1,1".format(tbl)) |
| assert 'Table already exists: {}_4'.format(tbl) in str(exception) |
| |
| def test_impala_insert(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| tbl = unique_database + ".foo" |
| tmp_tbl = unique_database + ".tmp" |
| self.execute_query("create table {}(i int) partitioned by(p int)".format(tbl)) |
| self.execute_query("create table {}(i int) partitioned by(p int)".format(tmp_tbl)) |
| insert_stmt = "insert into {} partition (p) select * from {}".format(tmp_tbl, tbl) |
| # Add one partition in Hive and use the table in INSERT in Impala |
| self.run_stmt_in_hive("insert into {} select 0,0".format(tbl)) |
| res = self.execute_query_expect_success(client, insert_stmt) |
| # Result rows are "partition_name: num_rows_inserted" for each modified partitions |
| assert 'Partition: p=0\nNumModifiedRows: 1\n' in res.runtime_profile |
| # Insert one row into the same partition in Hive and use the table in INSERT in Impala |
| self.run_stmt_in_hive("insert into {} select 1,0".format(tbl)) |
| res = self.execute_query_expect_success(client, insert_stmt) |
| assert 'Partition: p=0\nNumModifiedRows: 2\n' in res.runtime_profile |
| # Add another new partition in Hive and use the table in INSERT in Impala |
| self.run_stmt_in_hive("insert into {} select 2,2".format(tbl)) |
| res = self.execute_query_expect_success(client, insert_stmt) |
| assert 'Partition: p=0\nNumModifiedRows: 2\n' in res.runtime_profile |
| assert 'Partition: p=2\nNumModifiedRows: 1\n' in res.runtime_profile |
| # Drop one partition in Hive and use the table in INSERT in Impala |
| self.run_stmt_in_hive("alter table {} drop partition(p=0)".format(tbl)) |
| res = self.execute_query_expect_success(client, insert_stmt) |
| assert 'Partition: p=2\nNumModifiedRows: 1\n' in res.runtime_profile |
| # Truncate the table in Hive and use it in INSERT in Impala |
| self.run_stmt_in_hive("truncate table {}".format(tbl)) |
| res = self.execute_query_expect_success(client, insert_stmt) |
| assert 'NumModifiedRows:' not in res.runtime_profile |
| |
| def test_txn(self, vector, unique_database): |
| client = self.create_impala_client_from_vector(vector) |
| tbl = unique_database + ".foo" |
| self.run_stmt_in_hive( |
| "create transactional table {}(i int) partitioned by(p int)".format(tbl)) |
| # Load the table in Impala |
| self.execute_query_expect_success(client, "describe " + tbl) |
| # Insert the table in Hive and check it in Impala immediately |
| self.run_stmt_in_hive("insert into {} select 0,0".format(tbl)) |
| res = self.execute_query_expect_success(client, "select * from " + tbl) |
| assert res.data == ['0\t0'] |
| # Insert the table in Hive again and check number of rows in Impala |
| self.run_stmt_in_hive("insert into {} select 1,0".format(tbl)) |
| res = self.execute_query_expect_success(client, "select count(*) from " + tbl) |
| assert res.data == ['2'] |
| res = self.execute_query_expect_success(client, "show files in " + tbl) |
| assert len(res.data) == 2 |
| # Trigger compaction in Hive |
| self.run_stmt_in_hive( |
| "alter table {} partition(p=0)compact 'minor' and wait".format(tbl)) |
| res = self.execute_query_expect_success(client, "show files in " + tbl) |
| assert len(res.data) == 1 |
| |
| def test_hms_event_sync_on_deletion(self, vector, unique_name): |
| """Regression test for IMPALA-13829: TWaitForHmsEventResponse not able to collect |
| removed objects due to their items in deleteLog being GCed.""" |
| client = self.create_impala_client_from_vector(vector) |
| # Set a sleep time so catalogd has time to GC the deleteLog. |
| client.execute("set debug_action='collect_catalog_results_delay:SLEEP@1000'") |
| db = unique_name + "_db" |
| tbl = db + ".foo" |
| self.execute_query("drop database if exists {} cascade".format(db)) |
| self.execute_query("drop database if exists {}_2 cascade".format(db)) |
| self.execute_query("create database {}".format(db)) |
| self.execute_query("create database {}_2".format(db)) |
| # Create HMS Thrift clients to drop db/tables in the fastest way |
| hive_clients = [] |
| hive_transports = [] |
| for _ in range(2): |
| c, t = ImpalaTestSuite.create_hive_client() |
| hive_clients.append(c) |
| hive_transports.append(t) |
| |
| try: |
| # Drop 2 dbs concurrently. So their DROP_DATABASE events are processed together (in |
| # the same event batch). We need more than one db to be dropped so one of them in |
| # catalogd's deleteLog can be GCed since its version < latest catalog version. |
| # Note that this is no longer the way catalogd GCs the deleteLog after IMPALA-13829, |
| # but it can be used to trigger the issue before this fix. |
| def drop_db_in_hive(i, db_name): |
| hive_clients[i].drop_database(db_name, deleteData=True, cascade=True) |
| LOG.info("Dropped database {} in Hive".format(db_name)) |
| ts = [threading.Thread(target=drop_db_in_hive, args=params) |
| for params in [[0, db], [1, db + "_2"]]] |
| for t in ts: |
| t.start() |
| for t in ts: |
| t.join() |
| client.execute("create database " + db) |
| |
| self.execute_query("create table {}(i int)".format(tbl)) |
| self.execute_query("create table {}_2(i int)".format(tbl)) |
| |
| # Drop 2 tables concurrently. So their DROP_TABLE events are processed together (in |
| # the same event batch). We need more than one table to be dropped so one of them in |
| # catalogd's deleteLog can be GCed since its version < latest catalog version. |
| # Note that this is no longer the way catalogd GCs the deleteLog after IMPALA-13829, |
| # but it can be used to trigger the issue before this fix. |
| def drop_table_in_hive(i, tbl_name): |
| hive_clients[i].drop_table(db, tbl_name, deleteData=True) |
| LOG.info("Dropped table {}.{} in Hive".format(db, tbl_name)) |
| ts = [threading.Thread(target=drop_table_in_hive, args=params) |
| for params in [[0, "foo"], [1, "foo_2"]]] |
| for t in ts: |
| t.start() |
| for t in ts: |
| t.join() |
| client.execute("create table {}(i int)".format(tbl)) |
| finally: |
| for t in hive_transports: |
| t.close() |
| self.execute_query("drop database if exists {} cascade".format(db)) |
| self.execute_query("drop database if exists {}_2 cascade".format(db)) |
| |
| |
| class TestSelfRenameEvent(ImpalaTestSuite): |
| @pytest.mark.execute_serially |
| def test_self_rename_events(self, unique_database): |
| """Regression test for IMPALA-14307""" |
| try: |
| catalogd = ImpalaCluster.get_e2e_test_cluster().catalogd |
| self.execute_query("create table {}.tbl_a(i int)".format(unique_database)) |
| # Wait until the CREATE_DATABASE and CREATE_TABLE events are skipped. |
| EventProcessorUtils.wait_for_event_processing(self) |
| events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0) |
| self.execute_query( |
| "alter table {0}.tbl_a rename to {0}.tbl_b".format(unique_database)) |
| self.execute_query(":event_processor('pause')") |
| |
| with self.create_impala_client() as alter_client: |
| version_after_create = catalogd.service.get_catalog_version() |
| alter_client.set_configuration( |
| {"debug_action": "catalogd_table_rename_delay:SLEEP@6000"}) |
| alter_handle = alter_client.execute_async( |
| "alter table {0}.tbl_b rename to {0}.tbl_a".format(unique_database)) |
| alter_client.wait_for_admission_control(alter_handle, timeout_s=10) |
| # Wait for at most 10 second until catalogd increase the version for rename |
| # operation. This indicates the rename starts. |
| start_time = time.time() |
| while (time.time() - start_time < 10.0 |
| and catalogd.service.get_catalog_version() <= version_after_create): |
| time.sleep(0.05) |
| # Sleep to let catalogd sends the alter_table HMS RPC |
| time.sleep(1) |
| # Invalidate tbl_b to remove it in catalog |
| self.execute_query("invalidate metadata {}.tbl_b".format(unique_database)) |
| alter_client.wait_for_finished_timeout(alter_handle, timeout=10) |
| alter_client.close_query(alter_handle) |
| |
| # Resume event processing. The ALTER_TABLE RENAME events should be skipped. |
| self.execute_query(":event_processor('start')") |
| EventProcessorUtils.wait_for_event_processing(self) |
| events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0) |
| assert events_skipped_after == events_skipped_before + 2 |
| finally: |
| # Recover event processing to avoid impacting other tests |
| self.execute_query(":event_processor('start')") |