| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import absolute_import, division, print_function |
| from builtins import range |
| import logging |
| from os import getenv |
| import pytest |
| |
| |
| from beeswaxd.BeeswaxService import QueryState |
| from hive_metastore.ttypes import FireEventRequest |
| from hive_metastore.ttypes import FireEventRequestData |
| from hive_metastore.ttypes import InsertEventRequestData |
| from tests.common.custom_cluster_test_suite import CustomClusterTestSuite |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.skip import SkipIf, SkipIfFS |
| from tests.util.acid_txn import AcidTxn |
| from tests.util.hive_utils import HiveDbWrapper |
| from tests.util.event_processor_utils import EventProcessorUtils |
| from tests.util.filesystem_utils import WAREHOUSE |
| from tests.util.iceberg_util import IcebergCatalogs |
| |
| HIVE_SITE_HOUSEKEEPING_ON =\ |
| getenv('IMPALA_HOME') + '/fe/src/test/resources/hive-site-housekeeping-on' |
| |
| |
| @SkipIfFS.hive |
| class TestEventProcessingCustomConfigs(CustomClusterTestSuite): |
| """This class contains tests that exercise the event processing mechanism in the |
| catalog for non-default configurations""" |
| CATALOG_URL = "http://localhost:25020" |
| PROCESSING_TIMEOUT_S = 10 |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--blacklisted_dbs=testBlackListedDb " |
| "--blacklisted_tables=functional_parquet.testBlackListedTbl", |
| catalogd_args="--blacklisted_dbs=testBlackListedDb " |
| "--blacklisted_tables=functional_parquet.testBlackListedTbl " |
| "--hms_event_polling_interval_s=1") |
| def test_events_on_blacklisted_objects(self): |
| """Executes hive queries on blacklisted database and tables and makes sure that |
| event processor does not error out |
| """ |
| try: |
| event_id_before = EventProcessorUtils.get_last_synced_event_id() |
| # create a blacklisted database from hive and make sure event is ignored |
| self.run_stmt_in_hive("create database TESTblackListedDb") |
| # wait until all the events generated above are processed |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| assert EventProcessorUtils.get_last_synced_event_id() > event_id_before |
| # make sure that the blacklisted db is ignored |
| assert "TESTblackListedDb".lower() not in self.all_db_names() |
| |
| event_id_before = EventProcessorUtils.get_last_synced_event_id() |
| self.run_stmt_in_hive("create table testBlackListedDb.testtbl (id int)") |
| # create a table on the blacklisted database with a different case |
| self.run_stmt_in_hive("create table TESTBLACKlISTEDDb.t2 (id int)") |
| self.run_stmt_in_hive( |
| "create table functional_parquet.testBlackListedTbl (id int, val string)" |
| " partitioned by (part int) stored as parquet") |
| self.run_stmt_in_hive( |
| "alter table functional_parquet.testBlackListedTbl add partition (part=1)") |
| # wait until all the events generated above are processed |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| assert EventProcessorUtils.get_last_synced_event_id() > event_id_before |
| # make sure that the black listed table is not created |
| table_names = self.client.execute("show tables in functional_parquet").get_data() |
| assert "testBlackListedTbl".lower() not in table_names |
| |
| event_id_before = EventProcessorUtils.get_last_synced_event_id() |
| # generate a table level event with a different case |
| self.run_stmt_in_hive("drop table functional_parquet.TESTBlackListedTbl") |
| # wait until all the events generated above are processed |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| assert EventProcessorUtils.get_last_synced_event_id() > event_id_before |
| finally: |
| self.run_stmt_in_hive("drop database testBlackListedDb cascade") |
| self.run_stmt_in_hive("drop table functional_parquet.testBlackListedTbl") |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=10") |
| def test_drop_table_events(self): |
| """IMPALA-10187: Event processing fails on multiple events + DROP TABLE. |
| This test issues ALTER TABLE + DROP in quick succession and checks whether event |
| processing still works. |
| """ |
| event_proc_timeout = 15 |
| db_name = ImpalaTestSuite.get_random_name("drop_event_db_") |
| with HiveDbWrapper(self, db_name): |
| tbl_name = "foo" |
| self.run_stmt_in_hive(""" |
| drop table if exists {db}.{tbl}; |
| create table {db}.{tbl} (id int); |
| insert into {db}.{tbl} values(1);""".format(db=db_name, tbl=tbl_name)) |
| # With MetastoreEventProcessor running, the insert event will be processed. Query |
| # the table from Impala. |
| EventProcessorUtils.wait_for_event_processing(self, event_proc_timeout) |
| # Verify that the data is present in Impala. |
| data = self.execute_scalar("select * from %s.%s" % (db_name, tbl_name)) |
| assert data == '1' |
| # Execute ALTER TABLE + DROP in quick succession so they will be processed in the |
| # same event batch. |
| self.run_stmt_in_hive(""" |
| alter table {db}.{tbl} set tblproperties ('foo'='bar'); |
| drop table {db}.{tbl};""".format(db=db_name, tbl=tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self, event_proc_timeout) |
| # Check that the event processor status is still ACTIVE. |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| @CustomClusterTestSuite.with_args( |
| cluster_size=1, |
| catalogd_args="--hms_event_polling_interval_s=1") |
| def test_create_drop_events(self, unique_database): |
| """Regression test for IMPALA-10502. The test runs very slow with default |
| statestored update frequency and hence this is changed to a custom cluster |
| test.""" |
| self.__run_create_drop_test(unique_database, "database") |
| self.__run_create_drop_test(unique_database, "table") |
| self.__run_create_drop_test(unique_database, "table", True) |
| self.__run_create_drop_test(unique_database, "table", True, True) |
| self.__run_create_drop_test(unique_database, "partition") |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal --hms_event_polling_interval_s=1", |
| cluster_size=1) |
| def test_local_catalog_create_drop_events(self, unique_database): |
| """ |
| Test is similar to the test_create_drop_events except this runs on local |
| """ |
| self.__run_create_drop_test(unique_database, "database") |
| self.__run_create_drop_test(unique_database, "table") |
| self.__run_create_drop_test(unique_database, "table", True) |
| self.__run_create_drop_test(unique_database, "table", True, True) |
| self.__run_create_drop_test(unique_database, "partition") |
| |
| def __run_create_drop_test(self, db, type, rename=False, rename_db=False): |
| if type == "table": |
| if not rename: |
| queries = [ |
| "create table {0}.test_{1} (i int)".format(db, 1), |
| "drop table {0}.test_{1}".format(db, 1) |
| ] |
| else: |
| db_1 = "{}_1".format(db) |
| if rename_db: |
| self.execute_query_expect_success(self.create_impala_client(), |
| "drop database if exists {0} cascade".format(db_1)) |
| self.execute_query_expect_success(self.create_impala_client(), |
| "create database {0}".format(db_1)) |
| self.execute_query_expect_success(self.create_impala_client(), |
| "create table if not exists {0}.rename_test_1 (i int)".format(db)) |
| if rename_db: |
| queries = [ |
| "alter table {0}.rename_test_1 rename to {1}.rename_test_1".format(db, |
| db_1), |
| "alter table {0}.rename_test_1 rename to {1}.rename_test_1".format(db_1, db) |
| ] |
| else: |
| queries = [ |
| "alter table {0}.rename_test_1 rename to {0}.rename_test_2".format(db), |
| "alter table {0}.rename_test_2 rename to {0}.rename_test_1".format(db) |
| ] |
| create_metric_name = "tables-added" |
| removed_metric_name = "tables-removed" |
| elif type == "database": |
| self.execute_query_expect_success(self.create_impala_client(), |
| "drop database if exists {0}".format("test_create_drop_db")) |
| queries = [ |
| "create database {db}".format(db="test_create_drop_db"), |
| "drop database {db}".format(db="test_create_drop_db") |
| ] |
| create_metric_name = "databases-added" |
| removed_metric_name = "databases-removed" |
| else: |
| tbl_name = "test_create_drop_partition" |
| self.execute_query_expect_success(self.create_impala_client(), |
| "create table {db}.{tbl} (c int) partitioned by (p int)".format( |
| db=db, tbl=tbl_name)) |
| queries = [ |
| "alter table {db}.{tbl} add partition (p=1)".format(db=db, tbl=tbl_name), |
| "alter table {db}.{tbl} drop partition (p=1)".format(db=db, tbl=tbl_name) |
| ] |
| create_metric_name = "partitions-added" |
| removed_metric_name = "partitions-removed" |
| |
| # get the metric before values |
| EventProcessorUtils.wait_for_event_processing(self) |
| create_metric_val_before = EventProcessorUtils.get_int_metric(create_metric_name, 0) |
| removed_metric_val_before = EventProcessorUtils.get_int_metric(removed_metric_name, 0) |
| events_skipped_before = EventProcessorUtils.get_int_metric('events-skipped', 0) |
| num_iters = 100 |
| for iter in range(num_iters): |
| for q in queries: |
| try: |
| self.execute_query_expect_success(self.create_impala_client(), q) |
| except Exception as e: |
| print("Failed in {} iterations. Error {}".format(iter, str(e))) |
| raise |
| EventProcessorUtils.wait_for_event_processing(self) |
| create_metric_val_after = EventProcessorUtils.get_int_metric(create_metric_name, 0) |
| removed_metric_val_after = EventProcessorUtils.get_int_metric(removed_metric_name, 0) |
| events_skipped_after = EventProcessorUtils.get_int_metric('events-skipped', 0) |
| num_delete_event_entries = EventProcessorUtils.\ |
| get_int_metric('delete-event-log-size', 0) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| # None of the queries above should actually trigger a add/remove object from events |
| assert create_metric_val_after == create_metric_val_before |
| assert removed_metric_val_after == removed_metric_val_before |
| # each query set generates 2 events and both of them should be skipped |
| assert events_skipped_after == num_iters * 2 + events_skipped_before |
| # make sure that there are no more entries in the delete event log |
| assert num_delete_event_entries == 0 |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| def test_self_events(self, unique_database): |
| """Runs multiple queries which generate events and makes |
| sure that tables and partitions are not refreshed the queries is run from Impala. If |
| the queries are run from Hive, we make sure that the tables and partitions are |
| refreshed""" |
| self.__run_self_events_test(unique_database, True) |
| self.__run_self_events_test(unique_database, False) |
| |
| @CustomClusterTestSuite.with_args( |
| catalogd_args="--hms_event_polling_interval_s=5" |
| " --enable_reload_events=true") |
| def test_refresh_invalidate_events(self, unique_database): |
| self.run_test_refresh_invalidate_events(unique_database, "reload_table") |
| |
| @CustomClusterTestSuite.with_args( |
| catalogd_args="--hms_event_polling_interval_s=5" |
| " --enable_reload_events=true" |
| " --enable_sync_to_latest_event_on_ddls=true") |
| def test_refresh_invalidate_events_enable_sync_to_latest_events(self, unique_database): |
| self.run_test_refresh_invalidate_events(unique_database, "reload_table_sync", True) |
| |
| def run_test_refresh_invalidate_events(self, unique_database, test_reload_table, |
| enable_sync_to_latest_event_on_ddls=False): |
| """Test is to verify Impala-11808, refresh/invalidate commands should generate a |
| Reload event in HMS and CatalogD's event processor should process this event. |
| """ |
| self.client.execute( |
| "create table {}.{} (i int) partitioned by (year int) " |
| .format(unique_database, test_reload_table)) |
| self.client.execute( |
| "insert into {}.{} partition (year=2022) values (1),(2),(3)" |
| .format(unique_database, test_reload_table)) |
| self.client.execute( |
| "insert into {}.{} partition (year=2023) values (1),(2),(3)" |
| .format(unique_database, test_reload_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| |
| def check_self_events(query): |
| tbls_refreshed_before, partitions_refreshed_before, \ |
| events_skipped_before = self.__get_self_event_metrics() |
| last_event_id = EventProcessorUtils.get_current_notification_id(self.hive_client) |
| self.client.execute(query) |
| # Check if there is a reload event fired after refresh query. |
| events = EventProcessorUtils.get_next_notification(self.hive_client, last_event_id) |
| assert len(events) == 1 |
| last_event = events[0] |
| assert last_event.dbName == unique_database |
| assert last_event.tableName == test_reload_table |
| assert last_event.eventType == "RELOAD" |
| EventProcessorUtils.wait_for_event_processing(self) |
| tbls_refreshed_after, partitions_refreshed_after, \ |
| events_skipped_after = self.__get_self_event_metrics() |
| assert events_skipped_after > events_skipped_before |
| |
| check_self_events("refresh {}.{} partition(year=2022)" |
| .format(unique_database, test_reload_table)) |
| check_self_events("refresh {}.{}".format(unique_database, test_reload_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| |
| if enable_sync_to_latest_event_on_ddls: |
| # Test to verify if older events are being skipped in event processor |
| data = FireEventRequestData() |
| data.refreshEvent = True |
| req = FireEventRequest(True, data) |
| req.dbName = unique_database |
| req.tableName = test_reload_table |
| # table level reload events |
| tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events() |
| for i in range(10): |
| self.hive_client.fire_listener_event(req) |
| EventProcessorUtils.wait_for_event_processing(self) |
| tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events() |
| assert tbl_events_skipped_after > tbl_events_skipped_before |
| # partition level reload events |
| EventProcessorUtils.wait_for_event_processing(self) |
| part_events_skipped_before = EventProcessorUtils.get_num_skipped_events() |
| req.partitionVals = ["2022"] |
| for i in range(10): |
| self.hive_client.fire_listener_event(req) |
| EventProcessorUtils.wait_for_event_processing(self) |
| part_events_skipped_after = EventProcessorUtils.get_num_skipped_events() |
| assert part_events_skipped_after > part_events_skipped_before |
| |
| # Test to verify IMPALA-12213 |
| table = self.hive_client.get_table(unique_database, test_reload_table) |
| table.dbName = unique_database |
| table.tableName = "test_sequence_table" |
| self.hive_client.create_table(table) |
| data = FireEventRequestData() |
| data.refreshEvent = True |
| req = FireEventRequest(True, data) |
| req.dbName = unique_database |
| req.tableName = "test_sequence_table" |
| self.hive_client.fire_listener_event(req) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| @CustomClusterTestSuite.with_args( |
| catalogd_args="--enable_reload_events=true") |
| def test_reload_events_with_transient_partitions(self, unique_database): |
| tbl = unique_database + ".tbl" |
| create_stmt = "create table {} (i int) partitioned by(p int)".format(tbl) |
| add_part_stmt = "alter table {} add if not exists partition(p=0)".format(tbl) |
| drop_part_stmt = "alter table {} drop if exists partition(p=0)".format(tbl) |
| refresh_stmt = "refresh {} partition(p=0)".format(tbl) |
| end_states = [self.client.QUERY_STATES['FINISHED'], |
| self.client.QUERY_STATES['EXCEPTION']] |
| |
| self.execute_query(create_stmt) |
| self.execute_query(add_part_stmt) |
| # Run REFRESH partition in the background so we can drop the partition concurrently. |
| refresh_handle = self.client.execute_async(refresh_stmt) |
| # Before IMPALA-12855, REFRESH usually fails in 2-3 rounds. |
| for i in range(100): |
| self.execute_query(drop_part_stmt) |
| refresh_state = self.wait_for_any_state(refresh_handle, end_states, 10) |
| assert refresh_state == self.client.QUERY_STATES['FINISHED'],\ |
| "REFRESH state: {}. Error log: {}".format( |
| QueryState._VALUES_TO_NAMES[refresh_state], |
| self.client.get_log(refresh_handle)) |
| self.execute_query(add_part_stmt) |
| refresh_handle = self.client.execute_async(refresh_stmt) |
| |
| @CustomClusterTestSuite.with_args( |
| catalogd_args="--hms_event_polling_interval_s=10" |
| " --enable_skipping_older_events=true" |
| " --enable_sync_to_latest_event_on_ddls=true") |
| def test_skipping_older_events(self, unique_database): |
| """Test is to verify IMPALA-11535, event processor should ignore older events if the |
| current event id is older than the lastRefreshEventId on the table/partition |
| """ |
| test_old_table = "test_old_table" |
| |
| def verify_skipping_older_events(table_name, is_transactional, is_partitioned): |
| query = " ".join(["create", "transactional" if is_transactional else '', |
| "table {}.{} (i int)", "partitioned by (year int)" if is_partitioned else '']) |
| self.run_stmt_in_hive(query.format(unique_database, table_name)) |
| values = "values (10),(20),(30)" |
| EventProcessorUtils.wait_for_event_processing(self) |
| |
| def verify_skipping_hive_stmt_events(stmt, new_table_name): |
| tbl_events_skipped_before = EventProcessorUtils.get_num_skipped_events() |
| self.run_stmt_in_hive(stmt) |
| self.client.execute( |
| "refresh {}.{}".format(unique_database, new_table_name)) |
| tables_refreshed_before = EventProcessorUtils.get_int_metric("tables-refreshed") |
| partitions_refreshed_before = \ |
| EventProcessorUtils.get_int_metric("partitions-refreshed") |
| EventProcessorUtils.wait_for_event_processing(self) |
| tbl_events_skipped_after = EventProcessorUtils.get_num_skipped_events() |
| assert tbl_events_skipped_after > tbl_events_skipped_before |
| tables_refreshed_after = EventProcessorUtils.get_int_metric("tables-refreshed") |
| partitions_refreshed_after = \ |
| EventProcessorUtils.get_int_metric("partitions-refreshed") |
| if is_partitioned: |
| assert partitions_refreshed_after == partitions_refreshed_before |
| else: |
| assert tables_refreshed_after == tables_refreshed_before |
| |
| # test single insert event |
| query = " ".join(["insert into `{}`.`{}`", "partition (year=2023)" |
| if is_partitioned else '', values]) |
| verify_skipping_hive_stmt_events( |
| query.format(unique_database, table_name), table_name) |
| # test batch insert events |
| query = " ".join(["insert into `{}`.`{}`", "partition (year=2023)" |
| if is_partitioned else '', values, ";"]) |
| complete_query = "" |
| for _ in range(3): |
| complete_query += query.format(unique_database, table_name) |
| verify_skipping_hive_stmt_events(complete_query, table_name) |
| # Dynamic partitions test |
| query = " ".join(["create", "table `{}`.`{}` (i int)", |
| " partitioned by (year int) " if is_partitioned else '', |
| self.__get_transactional_tblproperties(is_transactional)]) |
| self.client.execute(query.format(unique_database, "new_table")) |
| complete_query = "insert overwrite table `{db}`.`{tbl1}` " \ |
| "select * from `{db}`.`{tbl2}`"\ |
| .format(db=unique_database, tbl1="new_table", tbl2=table_name) |
| verify_skipping_hive_stmt_events(complete_query, "new_table") |
| # Drop the tables before running another test |
| self.client.execute("drop table {}.{}".format(unique_database, table_name)) |
| self.client.execute("drop table {}.{}".format(unique_database, "new_table")) |
| verify_skipping_older_events(test_old_table, False, False) |
| verify_skipping_older_events(test_old_table, True, False) |
| verify_skipping_older_events(test_old_table, False, True) |
| verify_skipping_older_events(test_old_table, True, True) |
| |
| @CustomClusterTestSuite.with_args( |
| catalogd_args="--enable_sync_to_latest_event_on_ddls=true " |
| "--debug_actions=catalogd_get_filtered_events_delay:SLEEP@3000 ") |
| def test_skipping_batching_events(self, unique_database): |
| """Test to verify IMPALA-10949, improving batching logic for partition events. |
| Before batching the events, each event is checked if the event id is greater than |
| table's lastSyncEventId then the event can be batched else it can be skipped.""" |
| # Print trace logs from DebugUtils. |
| self.cluster.catalogd.set_jvm_log_level("org.apache.impala.util.DebugUtils", "trace") |
| test_batch_table = "test_batch_table" |
| self.client.execute( |
| "create table {}.{} like functional.alltypes" |
| .format(unique_database, test_batch_table)) |
| self.client.execute( |
| "insert into {}.{} partition (year,month) select * from functional.alltypes" |
| .format(unique_database, test_batch_table)) |
| # Generate batch ALTER_PARTITION events |
| self.run_stmt_in_hive( |
| "analyze table {}.{} compute statistics".format(unique_database, test_batch_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| batch_events_metric = "batch-events-created" |
| batch_events_1 = EventProcessorUtils.get_int_metric(batch_events_metric) |
| prev_skipped_events = EventProcessorUtils.get_int_metric("events-skipped") |
| self.run_stmt_in_hive( |
| "analyze table {}.{} compute statistics".format(unique_database, test_batch_table)) |
| self.client.execute("refresh {0}.{1}".format(unique_database, test_batch_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| batch_events_2 = EventProcessorUtils.get_int_metric(batch_events_metric) |
| current_skipped_events = EventProcessorUtils.get_int_metric("events-skipped") |
| # Make sure no new batch events are created |
| assert batch_events_2 == batch_events_1 |
| assert current_skipped_events - prev_skipped_events >= 24 |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| def test_commit_compaction_events(self, unique_database): |
| """Test is to verify Impala-11626, commit compaction events triggered in HMS would |
| be consumed by CatalogD's event processor. |
| """ |
| |
| # Test scenario 1: partitioned table |
| test_cc_part_table = "test_cc_partitioned_table" |
| self.run_stmt_in_hive( |
| "create transactional table {}.{} (i int) partitioned by (year int)" |
| .format(unique_database, test_cc_part_table)) |
| for i in range(2): |
| self.run_stmt_in_hive( |
| "insert into {}.{} partition (year=2022) values (1),(2),(3)" |
| .format(unique_database, test_cc_part_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| parts_refreshed_before_compaction = EventProcessorUtils.get_int_metric( |
| "partitions-refreshed") |
| self.client.execute( |
| "select * from {}.{} limit 2" |
| .format(unique_database, test_cc_part_table)) |
| self.run_stmt_in_hive( |
| "alter table {}.{} partition(year=2022) compact 'minor' and wait" |
| .format(unique_database, test_cc_part_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| parts_refreshed_after_compaction = EventProcessorUtils.get_int_metric( |
| "partitions-refreshed") |
| assert parts_refreshed_after_compaction > parts_refreshed_before_compaction |
| |
| # Test scenario 2: |
| test_cc_unpart_tab = "test_cc_unpart_table" |
| self.run_stmt_in_hive( |
| "create transactional table {}.{} (i int)" |
| .format(unique_database, test_cc_unpart_tab)) |
| for i in range(2): |
| self.run_stmt_in_hive( |
| "insert into {}.{} values (1),(2),(3)" |
| .format(unique_database, test_cc_unpart_tab)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| tables_refreshed_before_compaction = EventProcessorUtils.get_int_metric( |
| "tables-refreshed") |
| self.client.execute( |
| "select * from {}.{} limit 2" |
| .format(unique_database, test_cc_unpart_tab)) |
| self.run_stmt_in_hive("alter table {}.{} compact 'minor' and wait" |
| .format(unique_database, test_cc_unpart_tab)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| tables_refreshed_after_compaction = EventProcessorUtils.get_int_metric( |
| "tables-refreshed") |
| assert tables_refreshed_after_compaction > tables_refreshed_before_compaction |
| |
| # Test scenario 3: partitioned table has partition deleted |
| test_cc_part_table = "test_cc_partitioned_table_error" |
| self.run_stmt_in_hive( |
| "create transactional table {}.{} (i int) partitioned by (year int)" |
| .format(unique_database, test_cc_part_table)) |
| for i in range(2): |
| self.run_stmt_in_hive( |
| "insert into {}.{} partition (year=2022) values (1),(2),(3)" |
| .format(unique_database, test_cc_part_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| self.client.execute( |
| "select * from {}.{} limit 2" |
| .format(unique_database, test_cc_part_table)) |
| self.run_stmt_in_hive( |
| "alter table {}.{} partition(year=2022) compact 'minor' and wait" |
| .format(unique_database, test_cc_part_table)) |
| self.run_stmt_in_hive("alter table {}.{} Drop if exists partition(year=2022)" |
| .format(unique_database, test_cc_part_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| # Test scenario 4: process commit compaction for an unloaded table |
| test_cc_part_table = "test_cc_table_unloaded" |
| self.run_stmt_in_hive( |
| "create transactional table {}.{} (i int) partitioned by (year int)" |
| .format(unique_database, test_cc_part_table)) |
| for i in range(2): |
| self.run_stmt_in_hive( |
| "insert into {}.{} partition (year=2022) values (1),(2),(3)" |
| .format(unique_database, test_cc_part_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| self.run_stmt_in_hive( |
| "alter table {}.{} partition(year=2022) compact 'minor' and wait" |
| .format(unique_database, test_cc_part_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| def test_event_batching(self, unique_database): |
| """Runs queries which generate multiple ALTER_PARTITION events which must be |
| batched by events processor. Runs as a custom cluster test to isolate the metric |
| values from other tests.""" |
| testtbl = "test_event_batching" |
| test_acid_tbl = "test_event_batching_acid" |
| acid_props = self.__get_transactional_tblproperties(True) |
| # create test tables |
| self.client.execute( |
| "create table {}.{} like functional.alltypes".format(unique_database, testtbl)) |
| self.client.execute( |
| "insert into {}.{} partition (year,month) select * from functional.alltypes".format( |
| unique_database, testtbl)) |
| self.client.execute( |
| "create table {}.{} (id int) partitioned by (year int, month int) {}".format( |
| unique_database, test_acid_tbl, acid_props)) |
| self.client.execute( |
| "insert into {}.{} partition (year, month) " |
| "select id, year, month from functional.alltypes".format(unique_database, |
| test_acid_tbl)) |
| # run compute stats from impala; this should generate 24 ALTER_PARTITION events which |
| # should be batched together into 1 or more number of events. |
| EventProcessorUtils.wait_for_event_processing(self) |
| batch_events_metric = "batch-events-created" |
| batch_events_1 = EventProcessorUtils.get_int_metric(batch_events_metric) |
| self.client.execute("compute stats {}.{}".format(unique_database, testtbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| batch_events_2 = EventProcessorUtils.get_int_metric(batch_events_metric) |
| assert batch_events_2 > batch_events_1 |
| # run analyze stats event from hive which generates ALTER_PARTITION event on each |
| # partition of the table |
| self.run_stmt_in_hive( |
| "analyze table {}.{} compute statistics".format(unique_database, testtbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| batch_events_3 = EventProcessorUtils.get_int_metric(batch_events_metric) |
| assert batch_events_3 > batch_events_2 |
| # in case of transactional table since we batch the events together, the number of |
| # tables refreshed must be far lower than number of events generated |
| num_table_refreshes_1 = EventProcessorUtils.get_int_metric( |
| "tables-refreshed") |
| self.client.execute("compute stats {}.{}".format(unique_database, test_acid_tbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| batch_events_4 = EventProcessorUtils.get_int_metric(batch_events_metric) |
| num_table_refreshes_2 = EventProcessorUtils.get_int_metric( |
| "tables-refreshed") |
| # we should generate atleast 1 batch event if not more due to the 24 consecutive |
| # ALTER_PARTITION events |
| assert batch_events_4 > batch_events_3 |
| # table should not be refreshed since this is a self-event |
| assert num_table_refreshes_2 == num_table_refreshes_1 |
| self.run_stmt_in_hive( |
| "analyze table {}.{} compute statistics".format(unique_database, test_acid_tbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| batch_events_5 = EventProcessorUtils.get_int_metric(batch_events_metric) |
| assert batch_events_5 > batch_events_4 |
| num_table_refreshes_2 = EventProcessorUtils.get_int_metric("tables-refreshed") |
| # the analyze table from hive generates 24 ALTER_PARTITION events which should be |
| # batched into 1-2 batches (depending on timing of the event poll thread). |
| assert num_table_refreshes_2 > num_table_refreshes_1 |
| assert int(num_table_refreshes_2) - int(num_table_refreshes_1) < 24 |
| EventProcessorUtils.wait_for_event_processing(self) |
| # test for batching of insert events |
| batch_events_insert = EventProcessorUtils.get_int_metric(batch_events_metric) |
| tables_refreshed_insert = EventProcessorUtils.get_int_metric("tables-refreshed") |
| partitions_refreshed_insert = EventProcessorUtils.get_int_metric( |
| "partitions-refreshed") |
| self.client.execute( |
| "insert into {}.{} partition (year,month) select * from functional.alltypes".format( |
| unique_database, testtbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| batch_events_after_insert = EventProcessorUtils.get_int_metric(batch_events_metric) |
| tables_refreshed_after_insert = EventProcessorUtils.get_int_metric("tables-refreshed") |
| partitions_refreshed_after_insert = EventProcessorUtils.get_int_metric( |
| "partitions-refreshed") |
| # this is a self-event tables or partitions should not be refreshed |
| assert batch_events_after_insert > batch_events_insert |
| assert tables_refreshed_after_insert == tables_refreshed_insert |
| assert partitions_refreshed_after_insert == partitions_refreshed_insert |
| # run the insert from hive to make sure that batch event is refreshing all the |
| # partitions |
| self.run_stmt_in_hive( |
| "SET hive.exec.dynamic.partition.mode=nonstrict; insert into {}.{} partition" |
| " (year,month) select * from functional.alltypes".format( |
| unique_database, testtbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| batch_events_after_hive = EventProcessorUtils.get_int_metric(batch_events_metric) |
| partitions_refreshed_after_hive = EventProcessorUtils.get_int_metric( |
| "partitions-refreshed") |
| assert batch_events_after_hive > batch_events_insert |
| # 24 partitions inserted and hence we must refresh 24 partitions once. |
| assert int(partitions_refreshed_after_hive) == int(partitions_refreshed_insert) + 24 |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5") |
| def test_event_processor_failure_extra_space(self, unique_database): |
| """This test verifies that impala event processor is in active state after |
| processing a couple of previously erroneous events""" |
| test_table = "extra_space_table" |
| # IMPALA-11939 -- create table event in HMS contains extra spaces in the db/table |
| self.run_stmt_in_hive("create table ` {}`.`{} ` (i1 int) partitioned by (year int)" |
| .format(unique_database, test_table)) |
| self.run_stmt_in_hive("alter table ` {}`.`{} ` add columns (i2 int)" |
| .format(unique_database, test_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5") |
| def test_disable_hms_sync(self, unique_database): |
| """This test verifies that impala event processor is in active state after |
| processing an alter table event that re-enables hms sync""" |
| # test 1: re-enable disableHmsSync config at table level |
| test_table = "disable_hms_sync_table" |
| self.client.execute( |
| """create table {}.{} (i int) TBLPROPERTIES ('impala.disableHmsSync'='true')""" |
| .format(unique_database, test_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| prev_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0) |
| self.run_stmt_in_hive( |
| """ALTER TABLE {}.{} SET TBLPROPERTIES('somekey'='somevalue')""" |
| .format(unique_database, test_table)) |
| self.client.execute( |
| """ALTER TABLE {}.{} SET TBLPROPERTIES ('impala.disableHmsSync'='false')""" |
| .format(unique_database, test_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| current_events_skipped = EventProcessorUtils.get_int_metric('events-skipped', 0) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| assert current_events_skipped >= prev_events_skipped + 1 |
| |
| # test 2: re-enabling disableHmsSync config on a table shouldn't put event processor |
| # in error state if the database is not loaded. |
| try: |
| test_db = "unloaded_db_sync" |
| self.run_stmt_in_hive("""create database {}""".format(test_db)) |
| self.run_stmt_in_hive("""create table {}.{} (id int) |
| TBLPROPERTIES ('impala.disableHmsSync'='true')""".format(test_db, test_table)) |
| self.run_stmt_in_hive( |
| """ALTER TABLE {}.{} SET TBLPROPERTIES ('impala.disableHmsSync'='false')""" |
| .format(test_db, test_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| finally: |
| self.run_stmt_in_hive("""drop database {} cascade""".format(test_db)) |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=10") |
| def test_event_processor_dropped_partition(self, unique_database): |
| """This test verifies that impala event processor is in active state after |
| processing partitioned insert events of a dropped table""" |
| # IMPALA-11768 -- Insert partition events should be ignored |
| # if the table is dropped |
| test_table = "partitioned_table" |
| |
| def is_event_processor_active(is_insert): |
| self.run_stmt_in_hive("create table {}.{} (i1 int) partitioned by (year int)" |
| .format(unique_database, test_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| self.client.execute("refresh {}.{}".format(unique_database, test_table)) |
| self.run_stmt_in_hive( |
| "insert into {}.{} partition(year=2023) values (4),(5),(6)" |
| .format(unique_database, test_table)) |
| data = FireEventRequestData() |
| if is_insert: |
| insert_data = InsertEventRequestData() |
| insert_data.filesAdded = "/warehouse/mytable/b1" |
| insert_data.replace = False |
| data.insertData = insert_data |
| else: |
| data.refreshEvent = True |
| req = FireEventRequest(True, data) |
| req.dbName = unique_database |
| req.tableName = test_table |
| req.partitionVals = ["2023"] |
| self.hive_client.fire_listener_event(req) |
| self.run_stmt_in_hive( |
| "drop table {}.{}".format(unique_database, test_table)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| is_event_processor_active(True) |
| is_event_processor_active(False) |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| def test_iceberg_self_events(self, unique_database): |
| """This test checks that Impala doesn't refresh Iceberg tables on self events.""" |
| tbl_name = unique_database + ".test_iceberg_events" |
| iceberg_catalogs = IcebergCatalogs(unique_database) |
| |
| def check_self_events(query, skips_events=True): |
| tbls_refreshed_before, partitions_refreshed_before, \ |
| events_skipped_before = self.__get_self_event_metrics() |
| self.client.execute(query) |
| EventProcessorUtils.wait_for_event_processing(self) |
| tbls_refreshed_after, partitions_refreshed_after, \ |
| events_skipped_after = self.__get_self_event_metrics() |
| assert tbls_refreshed_before == tbls_refreshed_after |
| assert partitions_refreshed_before == partitions_refreshed_after |
| if skips_events: |
| assert events_skipped_after > events_skipped_before |
| |
| for catalog in iceberg_catalogs.get_iceberg_catalog_properties(): |
| is_hive_catalog = iceberg_catalogs.is_a_hive_catalog(catalog) |
| self.client.execute(""" |
| CREATE TABLE {0} (i int) STORED AS ICEBERG |
| TBLPROPERTIES ({1})""".format(tbl_name, catalog)) |
| |
| check_self_events("INSERT OVERWRITE {0} VALUES (1)".format(tbl_name), |
| skips_events=is_hive_catalog) |
| check_self_events("ALTER TABLE {0} ADD COLUMN j INT".format(tbl_name)) |
| check_self_events("ALTER TABLE {0} DROP COLUMN i".format(tbl_name)) |
| check_self_events("ALTER TABLE {0} CHANGE COLUMN j j BIGINT".format(tbl_name)) |
| # SET PARTITION SPEC only updates HMS in case of HiveCatalog (which sets |
| # table property 'metadata_location') |
| check_self_events( |
| "ALTER TABLE {0} SET PARTITION SPEC (truncate(2, j))".format(tbl_name), |
| skips_events=is_hive_catalog) |
| check_self_events( |
| "ALTER TABLE {0} SET TBLPROPERTIES('key'='value')".format(tbl_name)) |
| check_self_events("ALTER TABLE {0} UNSET TBLPROPERTIES('key')".format(tbl_name)) |
| check_self_events("INSERT INTO {0} VALUES (2), (3), (4)".format(tbl_name), |
| skips_events=is_hive_catalog) |
| ctas_tbl = unique_database + ".ice_ctas" |
| check_self_events("""CREATE TABLE {0} STORED AS ICEBERG |
| TBLPROPERTIES ({1}) AS SELECT * FROM {2}""".format(ctas_tbl, catalog, tbl_name)) |
| check_self_events("DROP TABLE {0}".format(ctas_tbl)) |
| check_self_events("TRUNCATE TABLE {0}".format(tbl_name), |
| skips_events=is_hive_catalog) |
| |
| self.client.execute("DROP TABLE {0}".format(tbl_name)) |
| |
| def __run_self_events_test(self, db_name, use_impala): |
| recover_tbl_name = ImpalaTestSuite.get_random_name("tbl_") |
| # create a table similar to alltypes so that we can recover the partitions on it |
| # later in one of the test queries |
| alltypes_tab_location = self.__get_tbl_location("functional", "alltypes") |
| self.client.execute( |
| "create external table {0}.{1} like functional.alltypes location '{2}'".format( |
| db_name, recover_tbl_name, alltypes_tab_location)) |
| self.client.execute("refresh {0}.{1}".format(db_name, recover_tbl_name)) |
| if use_impala: |
| queries = self.__get_impala_test_queries(db_name, recover_tbl_name) |
| # some queries do not trigger self-event evaluation (creates and drops) however, |
| # its still good to confirm that we don't do unnecessary refreshes in such cases |
| # For such queries we use a different metrics events-skipped to confirm that these |
| # events are skipped. |
| for stmt in queries[False]: |
| self.__exec_sql_and_check_selfevent_counter(stmt, use_impala, False) |
| # All the queries with True key should confirm that the events-skipped counter |
| # is also incremented |
| for stmt in queries[True]: |
| self.__exec_sql_and_check_selfevent_counter(stmt, use_impala) |
| else: |
| queries = self.__get_hive_test_queries(db_name, recover_tbl_name) |
| for stmt in queries: |
| self.__exec_sql_and_check_selfevent_counter(stmt, use_impala) |
| |
| def __get_impala_test_queries(self, db_name, recover_tbl_name): |
| tbl_name = ImpalaTestSuite.get_random_name("tbl_") |
| acid_tbl_name = ImpalaTestSuite.get_random_name("acid_tbl_") |
| acid_no_part_tbl_name = ImpalaTestSuite.get_random_name("acid_no_part_tbl_") |
| tbl2 = ImpalaTestSuite.get_random_name("tbl_") |
| view_name = ImpalaTestSuite.get_random_name("view_") |
| view2 = ImpalaTestSuite.get_random_name("view_") |
| # create a empty table for both partitioned and unpartitioned case for testing insert |
| # events |
| empty_unpartitioned_tbl = ImpalaTestSuite.get_random_name("empty_unpart_tbl_") |
| empty_partitioned_tbl = ImpalaTestSuite.get_random_name("empty_parttbl_") |
| self.client.execute( |
| "create table {0}.{1} (c1 int)".format(db_name, empty_unpartitioned_tbl)) |
| self.client.execute( |
| "create table {0}.{1} (c1 int) partitioned by (part int)".format(db_name, |
| empty_partitioned_tbl)) |
| acid_props = self.__get_transactional_tblproperties(True) |
| self_event_test_queries = { |
| # Queries which will increment the events-skipped counter |
| True: [ |
| # ALTER_DATABASE case |
| "comment on database {0} is 'self-event test database'".format(db_name), |
| "alter database {0} set owner user `test-user`".format(db_name), |
| "create function {0}.f() returns int location '{1}/libTestUdfs.so' " |
| "symbol='NoArgs'".format(db_name, WAREHOUSE), |
| "drop function {0}.f()".format(db_name), |
| # ALTER_TABLE case |
| "alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name), |
| "alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name), |
| "alter table {0}.{1} ALTER COLUMN C1 set comment 'c1 comment'".format(db_name, |
| tbl_name), |
| "comment on table {0}.{1} IS 'table level comment'".format(db_name, tbl_name), |
| "comment on column {0}.{1}.C1 IS 'column level comment'".format(db_name, |
| tbl_name), |
| "alter table {0}.{1} ADD COLUMNS (c2 int, c3 string)".format(db_name, tbl_name), |
| "alter table {0}.{1} DROP COLUMN c1".format(db_name, tbl_name), |
| "alter table {0}.{1} DROP COLUMN c2".format(db_name, tbl_name), |
| "alter table {0}.{1} DROP COLUMN c3".format(db_name, tbl_name), |
| "alter table {0}.{1} set owner user `test-user`".format(db_name, tbl_name), |
| "alter table {0}.{1} set owner role `test-role`".format(db_name, tbl_name), |
| "alter view {0}.{1} set owner user `test-view-user`".format(db_name, view_name), |
| "alter view {0}.{1} set owner role `test-view-role`".format(db_name, view_name), |
| # compute stats will generates ALTER_PARTITION |
| "compute stats {0}.{1}".format(db_name, tbl_name), |
| "compute incremental stats {0}.{1}".format(db_name, tbl_name), |
| "drop stats {0}.{1}".format(db_name, tbl_name), |
| # insert into a existing partition; generates INSERT self-event |
| "insert into table {0}.{1} partition " |
| "(year, month) select * from functional.alltypessmall where year=2009 " |
| "and month=1".format(db_name, tbl_name), |
| # insert overwrite query from Impala also generates a INSERT self-event |
| "insert overwrite table {0}.{1} partition " |
| "(year, month) select * from functional.alltypessmall where year=2009 " |
| "and month=1".format(db_name, tbl_name), |
| # events processor doesn't process delete column stats events currently, |
| # however, in case of incremental stats, there could be alter table and |
| # alter partition events which should be ignored. Hence we run compute stats |
| # before to make sure that the truncate table command generated alter events |
| # are ignored. |
| "compute incremental stats {0}.{1}".format(db_name, tbl_name), |
| "truncate table {0}.{1}".format(db_name, tbl_name)], |
| False: [ |
| "create table {0}.{1} like functional.alltypessmall " |
| "stored as parquet".format(db_name, tbl_name), |
| "create view {0}.{1} as select * from functional.alltypessmall " |
| "where year=2009".format(db_name, view_name), |
| # in case of rename we process it as drop+create and hence |
| # the events-skipped counter is not updated. Instead if this event is processed, |
| # it will increment the tables-added and tables-removed counters. |
| "alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl_name, tbl2), |
| "alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl2, tbl_name), |
| "alter view {0}.{1} rename to {0}.{2}".format(db_name, view_name, view2), |
| "alter view {0}.{1} rename to {0}.{2}".format(db_name, view2, view_name), |
| # ADD_PARTITION cases |
| # dynamic partition insert (creates new partitions) |
| "insert into table {0}.{1} partition (year,month) " |
| "select * from functional.alltypessmall where month % 2 = 0".format(db_name, |
| tbl_name), |
| "insert overwrite table {0}.{1} partition (year,month) " |
| "select * from functional.alltypessmall where month % 2 = 1".format(db_name, |
| tbl_name), |
| # we add this statement below just to make sure that the subsequent statement is |
| # a no-op |
| "alter table {0}.{1} add if not exists partition (year=2100, month=1)".format( |
| db_name, tbl_name), |
| "alter table {0}.{1} add if not exists partition (year=2100, month=1)".format( |
| db_name, tbl_name), |
| # DROP_PARTITION cases |
| "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format( |
| db_name, tbl_name), |
| # drop non-existing partition; essentially this is a no-op |
| "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format( |
| db_name, tbl_name), |
| # empty table case where no insert events are generated |
| "insert overwrite {0}.{1} select * from {0}.{1}".format( |
| db_name, empty_unpartitioned_tbl), |
| "insert overwrite {0}.{1} partition(part) select * from {0}.{1}".format( |
| db_name, empty_partitioned_tbl), |
| # in case of ACID tables no INSERT event is generated as the COMMIT event |
| # contains the related data |
| "create table {0}.{1} (c1 int) {2}".format(db_name, |
| acid_no_part_tbl_name, acid_props), |
| "insert into table {0}.{1} values (1) ".format(db_name, acid_no_part_tbl_name), |
| "insert overwrite table {0}.{1} select * from {0}.{1}".format( |
| db_name, acid_no_part_tbl_name), |
| "truncate table {0}.{1}".format(db_name, acid_no_part_tbl_name), |
| # the table is empty so the following insert adds 0 rows |
| "insert overwrite table {0}.{1} select * from {0}.{1}".format( |
| db_name, acid_no_part_tbl_name), |
| "create table {0}.{1} (c1 int) partitioned by (part int) {2}".format(db_name, |
| acid_tbl_name, acid_props), |
| "insert into table {0}.{1} partition (part=1) " |
| "values (1) ".format(db_name, acid_tbl_name), |
| "insert into table {0}.{1} partition (part) select id, int_col " |
| "from functional.alltypestiny".format(db_name, acid_tbl_name), |
| # repeat the same insert, now it writes to existing partitions |
| "insert into table {0}.{1} partition (part) select id, int_col " |
| "from functional.alltypestiny".format(db_name, acid_tbl_name), |
| # following insert overwrite is used instead of truncate, because truncate |
| # leads to a non-self event that reloads the table |
| "insert overwrite table {0}.{1} partition (part) select id, int_col " |
| "from functional.alltypestiny where id=-1".format(db_name, acid_tbl_name), |
| "insert overwrite table {0}.{1} partition (part) select id, int_col " |
| "from functional.alltypestiny".format(db_name, acid_tbl_name), |
| "insert overwrite {0}.{1} partition(part) select * from {0}.{1}".format( |
| db_name, acid_tbl_name), |
| # recover partitions will generate add_partition events |
| "alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name) |
| ] |
| } |
| return self_event_test_queries |
| |
| def __get_hive_test_queries(self, db_name, recover_tbl_name): |
| tbl_name = ImpalaTestSuite.get_random_name("hive_test_tbl_") |
| tbl2 = ImpalaTestSuite.get_random_name("hive_renamed_tbl_") |
| view_name = ImpalaTestSuite.get_random_name("hive_view_") |
| # we use a custom table schema to make it easier to change columns later in the |
| # test_queries |
| self.client.execute("create table {0}.{1} (key int) partitioned by " |
| "(part int) stored as parquet".format(db_name, tbl_name)) |
| self.client.execute( |
| "create view {0}.{1} as select * from functional.alltypessmall where year=2009" |
| .format(db_name, view_name)) |
| # events-processor only refreshes loaded tables, hence its important to issue a |
| # refresh here so that table is in loaded state |
| self.client.execute("refresh {0}.{1}".format(db_name, tbl_name)) |
| self_event_test_queries = [ |
| # ALTER_DATABASE cases |
| "alter database {0} set dbproperties ('comment'='self-event test " |
| "database')".format(db_name), |
| "alter database {0} set owner user `test-user`".format(db_name), |
| # ALTER_TABLE case |
| "alter table {0}.{1} set tblproperties ('k'='v')".format(db_name, tbl_name), |
| "alter table {0}.{1} add columns (value string)".format(db_name, tbl_name), |
| "alter table {0}.{1} set owner user `test-user`".format(db_name, tbl_name), |
| "alter table {0}.{1} set owner role `test-role`".format(db_name, tbl_name), |
| # need to set this config to make sure the dynamic partition insert works below |
| "set hive.exec.dynamic.partition.mode=nonstrict", |
| # ADD_PARTITION cases |
| "insert into table {0}.{1} partition (part=2009) " |
| "select id as key, string_col as value from functional.alltypessmall".format( |
| db_name, tbl_name), |
| # add partition |
| "alter table {0}.{1} add if not exists partition (part=1111)".format( |
| db_name, tbl_name), |
| # add existing partition; essentially this is a no-op |
| "alter table {0}.{1} add if not exists partition (part=1111)".format( |
| db_name, tbl_name), |
| # DROP_PARTITION cases |
| "alter table {0}.{1} drop if exists partition (part=1111)".format( |
| db_name, tbl_name), |
| # drop non-existing partition; essentially this is a no-op |
| "alter table {0}.{1} drop if exists partition (part=1111)".format( |
| db_name, tbl_name), |
| # compute stats will generates ALTER_PARTITION |
| "analyze table {0}.{1} compute statistics for columns".format(db_name, tbl_name), |
| "msck repair table {0}.{1}".format(db_name, recover_tbl_name), |
| # we rename in the end since impala will have the new table in unloaded |
| # state after rename and hence any events later will be ignored anyways. |
| "alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl_name, tbl2), |
| "alter view {0}.{1} rename to {0}.{2}".format(db_name, view_name, |
| ImpalaTestSuite.get_random_name("view_")), |
| ] |
| return self_event_test_queries |
| |
| @staticmethod |
| def __get_self_event_metrics(): |
| """ |
| Gets the tables-refreshed, partitions-refreshed and events-skipped metric values |
| from Metastore EventsProcessor |
| """ |
| tbls_refreshed_count = EventProcessorUtils.get_int_metric('tables-refreshed', 0) |
| partitions_refreshed_count = EventProcessorUtils.get_int_metric( |
| 'partitions-refreshed', 0) |
| events_skipped_count = EventProcessorUtils.get_int_metric('events-skipped', 0) |
| return int(tbls_refreshed_count), int(partitions_refreshed_count), \ |
| int(events_skipped_count) |
| |
| def __exec_sql_and_check_selfevent_counter(self, stmt, use_impala_client, |
| check_events_skipped_counter=True): |
| """ |
| Method runs a given query statement using a impala client or hive client based on the |
| argument use_impala_client and confirms if the self-event related counters are as |
| expected based on whether we expect a self-event or not. If the |
| check_self_event_counter is False it skips checking the events-skipped metric. |
| """ |
| EventProcessorUtils.wait_for_event_processing(self) |
| tbls_refreshed, partitions_refreshed, \ |
| events_skipped = self.__get_self_event_metrics() |
| last_synced_event = EventProcessorUtils.get_last_synced_event_id() |
| logging.info("Running statement in {1}: {0}".format(stmt, |
| "impala" if use_impala_client else "hive")) |
| if not use_impala_client: |
| self.run_stmt_in_hive(stmt) |
| else: |
| self.client.execute(stmt) |
| |
| EventProcessorUtils.wait_for_event_processing(self) |
| tbls_refreshed_after, partitions_refreshed_after, \ |
| events_skipped_after = self.__get_self_event_metrics() |
| last_synced_event_after = EventProcessorUtils.get_last_synced_event_id() |
| # we assume that any event which comes due to stmts run from impala-client are |
| # self-events |
| logging.info( |
| "Event id before {0} event id after {1}".format(last_synced_event, |
| last_synced_event_after)) |
| if use_impala_client: |
| # self-event counter must increase if this is a self-event if |
| # check_self_event_counter is set |
| # some of the test queries generate no events at all. If that is the case |
| # skip the below comparison |
| if last_synced_event_after > last_synced_event: |
| if check_events_skipped_counter: |
| assert events_skipped_after > events_skipped, \ |
| "Failing query(impala={}): {}".format(use_impala_client, stmt) |
| # if this is a self-event, no table or partitions should be refreshed |
| assert tbls_refreshed == tbls_refreshed_after, \ |
| "Failing query(impala={}): {}".format(use_impala_client, stmt) |
| assert partitions_refreshed == partitions_refreshed_after, \ |
| "Failing query(impala={}): {}".format(use_impala_client, stmt) |
| else: |
| # hive was used to run the stmts, any events generated should not have been deemed |
| # as self events unless there are empty partition add/drop events |
| assert events_skipped <= events_skipped_after |
| |
| def __get_tbl_location(self, db_name, tbl_name): |
| assert self.hive_client is not None |
| return self.hive_client.get_table(db_name, tbl_name).sd.location |
| |
| def __get_transactional_tblproperties(self, is_transactional): |
| """ |
| Util method to generate the tblproperties for transactional tables |
| """ |
| tblproperties = "" |
| if is_transactional: |
| tblproperties = "tblproperties ('transactional'='true'," \ |
| "'transactional_properties'='insert_only')" |
| return tblproperties |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5") |
| def test_stale_drop_partition_events(self, unique_database): |
| """Regression Tests for IMPALA-12256. Verifies stale DROP_PARTITION events are |
| skipped even if they are processed late after some other DDLs. Uses a higher polling |
| interval to ensure late processing on the events""" |
| self.client.execute( |
| "create table %s.part(i int) partitioned by (p int) stored as textfile" |
| % unique_database) |
| self.client.execute( |
| "insert into %s.part partition (p=0) values (0)" % unique_database) |
| # These DDLs will reload the partition metadata. We will verify they don't lose |
| # the create event ids after the reload. |
| partition_ddls = [ |
| "compute stats %s.part" % unique_database, |
| "compute incremental stats %s.part" % unique_database, |
| "compute incremental stats %s.part partition(p=0)" % unique_database, |
| "alter table %s.part partition(p=0) set row format" |
| " delimited fields terminated by ','" % unique_database, |
| "alter table %s.part partition(p=0) set fileformat parquet" % unique_database, |
| "alter table %s.part partition(p=0) set location '/tmp'" % unique_database, |
| "alter table %s.part partition(p=0) set tblproperties('k'='v')" % unique_database, |
| "refresh %s.part partition(p=0)" % unique_database, |
| "refresh %s.part" % unique_database, |
| ] |
| # Wait until the events in preparing the table are consumed. |
| EventProcessorUtils.wait_for_event_processing(self) |
| parts_added_before = EventProcessorUtils.get_int_metric("partitions-added") |
| parts_refreshed_before = EventProcessorUtils.get_int_metric("partitions-refreshed") |
| parts_removed_before = EventProcessorUtils.get_int_metric("partitions-removed") |
| for ddl in partition_ddls: |
| events_skipped_before = EventProcessorUtils.get_int_metric("events-skipped") |
| # Drop-create the partition and then runs a DDL on it. A DROP_PARTITION and an |
| # ADD_PARTITION event will be generated and should be skipped. The 3rd DDL might |
| # generate an ALTER_PARTITION event but it should be skipped as self-event. |
| # Note that we don't perform self-event detection on ADD/DROP_PARTITION events. |
| # They are skipped based on the partition level create event ids. So we should see |
| # no partitions are added/removed/refreshed if we correctly track the create event |
| # id (saved by the 2nd DDL that creates the partition). |
| # For the DROP_PARTITION event, there are 3 cases: |
| # 1) The DROP_PARTITION event is processed before the INSERT statement. |
| # It's skipped since the partition doesn't exist. |
| # 2) The DROP_PARTITION event is processed after the INSERT statement |
| # and before the 3rd DDL. The INSERT statement creates the partition so saves |
| # the create event id which is higher than the id of the DROP_PARTITION event. |
| # Thus the DROP_PARTITION event is skipped. |
| # 3) The DROP_PARTITION event is processed after the 3rd DDL. The reload triggered |
| # by the DDL should keep track of the create event id so the DROP_PARTITION event |
| # can be skipped. |
| # This test sets hms_event_polling_interval_s to 5 which is long enough for the |
| # 3 DDLs to finish. So it's more likely the 3rd case would happen, which is the |
| # case of IMPALA-12256. |
| self.client.execute( |
| "alter table %s.part drop partition (p=0)" % unique_database) |
| self.client.execute( |
| "insert into %s.part partition(p=0) values (1),(2)" % unique_database) |
| self.client.execute(ddl) |
| EventProcessorUtils.wait_for_event_processing(self) |
| events_skipped_after = EventProcessorUtils.get_int_metric("events-skipped") |
| parts_added_after = EventProcessorUtils.get_int_metric("partitions-added") |
| parts_refreshed_after = EventProcessorUtils.get_int_metric("partitions-refreshed") |
| parts_removed_after = EventProcessorUtils.get_int_metric("partitions-removed") |
| # Event-processor should not update any partitions since all events should be |
| # skipped |
| assert parts_removed_before == parts_removed_after |
| assert parts_added_before == parts_added_after |
| assert parts_refreshed_before == parts_refreshed_after |
| assert events_skipped_after > events_skipped_before |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=5") |
| def test_truncate_table_from_hive(self, unique_database): |
| """IMPALA-12636: verify truncate table from hive reloads file metadata in Impala""" |
| hive_tbl = "tbl_in_hive" |
| values = "values (10),(20),(30)" |
| |
| def verify_truncate_op_in_hive(tbl_name, is_transactional, is_partitioned, |
| is_batched): |
| create_query = " ".join(["create", "table `{}`.`{}` (i int)", |
| " partitioned by (year int) " if is_partitioned else '', |
| self.__get_transactional_tblproperties(is_transactional)]) |
| self.execute_query(create_query.format(unique_database, tbl_name)) |
| insert_query = " ".join(["insert into `{}`.`{}`", "partition (year=2024)" |
| if is_partitioned else '', values]) |
| self.run_stmt_in_hive(insert_query.format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| self.client.execute("refresh {}.{}".format(unique_database, tbl_name)) |
| truncate_query = " ".join(["truncate table `{}`.`{}`", "partition (year=2024)" |
| if is_partitioned else '']) |
| self.run_stmt_in_hive(truncate_query.format(unique_database, tbl_name)) |
| if is_batched: |
| self.run_stmt_in_hive( |
| "insert into {}.{} partition (year=2024) values (1),(2)" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| data = int(self.execute_scalar("select count(*) from {0}.{1}".format( |
| unique_database, tbl_name))) |
| assert data == 2 if is_batched else data == 0 |
| self.client.execute("drop table {}.{}".format(unique_database, tbl_name)) |
| # Case-I: truncate single partition |
| verify_truncate_op_in_hive(hive_tbl, False, False, False) |
| verify_truncate_op_in_hive(hive_tbl, True, False, False) |
| verify_truncate_op_in_hive(hive_tbl, False, True, False) |
| verify_truncate_op_in_hive(hive_tbl, False, True, True) |
| verify_truncate_op_in_hive(hive_tbl, True, True, False) |
| verify_truncate_op_in_hive(hive_tbl, True, True, True) |
| |
| # Case-II: truncate partition in multi partition |
| hive_tbl = "multi_part_tbl" |
| self.client.execute("create table {}.{} (i int) partitioned by " |
| "(p int, q int)".format(unique_database, hive_tbl)) |
| self.client.execute("insert into {}.{} partition(p, q) values " |
| "(0,0,0), (0,0,1), (0,0,2)".format(unique_database, hive_tbl)) |
| self.client.execute("insert into {}.{} partition(p, q) values " |
| "(0,1,0), (0,1,1)".format(unique_database, hive_tbl)) |
| self.run_stmt_in_hive("truncate table {}.{} partition(p=0)" |
| .format(unique_database, hive_tbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| data = int(self.execute_scalar("select count(*) from {0}.{1}".format( |
| unique_database, hive_tbl))) |
| assert data == 2 |
| self.run_stmt_in_hive("truncate table {}.{}" |
| .format(unique_database, hive_tbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| data = int(self.execute_scalar("select count(*) from {0}.{1}".format( |
| unique_database, hive_tbl))) |
| assert data == 0 |
| |
| @SkipIf.is_test_jdk |
| @CustomClusterTestSuite.with_args( |
| catalogd_args="--hms_event_polling_interval_s=100", |
| hive_conf_dir=HIVE_SITE_HOUSEKEEPING_ON) |
| def test_commit_compaction_with_abort_txn(self, unique_database): |
| """Use a long enough polling interval to allow Hive statements to finish before |
| the ABORT_TXN event is processed. In local tests, the Hive statements usually |
| finish in 60s. |
| TODO: improve this by adding commands to pause and resume the event-processor.""" |
| tbl = "part_table" |
| fq_tbl = unique_database + '.' + tbl |
| acid = AcidTxn(self.hive_client) |
| self.run_stmt_in_hive( |
| "create transactional table {} (i int) partitioned by (p int)".format(fq_tbl)) |
| |
| # Allocate a write id on this table and abort the txn |
| txn_id = acid.open_txns() |
| acid.allocate_table_write_ids(txn_id, unique_database, tbl) |
| acid.abort_txn(txn_id) |
| |
| # Insert some rows and trigger compaction |
| for i in range(2): |
| self.run_stmt_in_hive( |
| "insert into {} partition(p=0) values (1),(2),(3)".format(fq_tbl)) |
| self.run_stmt_in_hive( |
| "alter table {} partition(p=0) compact 'major' and wait".format(fq_tbl)) |
| |
| # The CREATE_TABLE event hasn't been processed yet so we have to explictily invalidate |
| # the table first. |
| self.client.execute("invalidate metadata " + fq_tbl) |
| # Reload the table so the latest valid writeIdList is loaded |
| self.client.execute("refresh " + fq_tbl) |
| # Process the ABORT_TXN event |
| EventProcessorUtils.wait_for_event_processing(self, timeout=100) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| # Uncomment this once we can stop and resume the event-processor using commands. |
| # Currently the test is flaky with it since the Hive statements could take longer to |
| # finish than 100s (e.g. I saw a run of 5mins). |
| # self.assert_catalogd_log_contains("INFO", "Not added ABORTED write id 1 since it's " |
| # + "not opened and might already be cleaned up") |
| |
| @CustomClusterTestSuite.with_args( |
| catalogd_args="--hms_event_incremental_refresh_transactional_table=false") |
| def test_no_hms_event_incremental_refresh_transactional_table(self, unique_database): |
| """IMPALA-12835: Test that Impala notices inserts to acid tables when |
| hms_event_incremental_refresh_transactional_table is false. |
| """ |
| for partitioned in [False, True]: |
| tbl = "part_tbl" if partitioned else "tbl" |
| fq_tbl = unique_database + '.' + tbl |
| part_create = " partitioned by (p int)" if partitioned else "" |
| part_insert = " partition (p = 1)" if partitioned else "" |
| |
| self.run_stmt_in_hive( |
| "create transactional table {} (i int){}".format(fq_tbl, part_create)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| |
| # Load the table in Impala before INSERT |
| self.client.execute("refresh " + fq_tbl) |
| self.run_stmt_in_hive( |
| "insert into {}{} values (1),(2),(3)".format(fq_tbl, part_insert)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| |
| results = self.client.execute("select i from " + fq_tbl) |
| assert results.data == ["1", "2", "3"] |