| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from __future__ import absolute_import, division, print_function |
| |
| from builtins import range |
| |
| from impala_thrift_gen.hive_metastore.ttypes import FireEventRequest, FireEventRequestData |
| from tests.common.custom_cluster_test_suite import CustomClusterTestSuite |
| from tests.common.skip import SkipIfCatalogV2, SkipIfFS |
| from tests.metadata.test_event_processing_base import TestEventProcessingBase |
| from tests.util.acid_txn import AcidTxn |
| from tests.util.event_processor_utils import EventProcessorUtils |
| |
| |
| @SkipIfFS.hive |
| class TestEventProcessingError(CustomClusterTestSuite): |
| """ |
| Tests for verify event processor not going into error state whenever there are |
| runtime exceptions while processing events. |
| """ |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--invalidate_metadata_on_event_processing_failure=false " |
| "--inject_process_event_failure_event_types='ALTER_TABLE' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_sanity_check(self, unique_database): |
| """Tests event processor going into error state for alter table event""" |
| tbl_name = "hive_alter_table" |
| self.__create_table_and_load__(unique_database, tbl_name, False, False) |
| self.run_stmt_in_hive( |
| "alter table {}.{} set owner user `test-user`" |
| .format(unique_database, tbl_name)) |
| try: |
| EventProcessorUtils.wait_for_event_processing(self) |
| except Exception: |
| assert EventProcessorUtils.get_event_processor_status() == "ERROR" |
| self.client.execute("INVALIDATE METADATA") |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='ALTER_TABLE' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_alter_table(self, unique_database): |
| """Tests event processor going into error state for alter table event""" |
| tbl_name = "hive_alter_table" |
| self.__create_table_and_load__(unique_database, tbl_name, False, False) |
| self.run_stmt_in_hive( |
| "alter table {}.{} set owner user `test-user`" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| result = self.client.execute("describe formatted {}.{}" |
| .format(unique_database, tbl_name)) |
| self.verify_owner_property(result, 'test-user') |
| |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='ADD_PARTITION' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_add_partition(self, unique_database): |
| """Tests event processor going into error state for add partition event""" |
| tbl_name = "hive_table_add_partition" |
| self.__create_table_and_load__(unique_database, tbl_name, False, True) |
| self.client.execute("describe {}.{}".format(unique_database, tbl_name)) |
| self.run_stmt_in_hive( |
| "alter table {}.{} add partition(year=2024)" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| result = self.client.execute("show partitions {}.{}" |
| .format(unique_database, tbl_name)) |
| # First line is the header. Only one partition should be shown so the |
| # result has two lines. |
| assert "hive_table_add_partition/year=2024" in result.get_data() |
| assert len(result.data) == 2 |
| |
| @SkipIfFS.hive |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='ALTER_PARTITION' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_alter_partition(self, unique_database): |
| """Tests event processor going into error state for alter partition event""" |
| tbl_name = "hive_table_alter_partition" |
| self.__create_table_and_load__(unique_database, tbl_name, False, True) |
| self.run_stmt_in_hive( |
| "alter table {}.{} add partition(year=2024)" |
| .format(unique_database, tbl_name)) |
| self.run_stmt_in_hive( |
| "analyze table {}.{} compute statistics" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| result = self.client.execute("show partitions {}.{}" |
| .format(unique_database, tbl_name)) |
| assert "2024" in result.get_data() |
| assert len(result.data) == 2 |
| |
| @SkipIfFS.hive |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='ALTER_PARTITIONS' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_alter_partitions(self, unique_database): |
| """Tests event processor going into error state for batch alter partitions event""" |
| tbl_name = "hive_table_alter_partitions" |
| self.__create_table_and_load__(unique_database, tbl_name, False, True) |
| for i in range(5): |
| self.client.execute( |
| "alter table {}.{} add partition(year={})" |
| .format(unique_database, tbl_name, i)) |
| self.run_stmt_in_hive( |
| "analyze table {}.{} compute statistics" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| result = self.client.execute("show partitions {}.{}" |
| .format(unique_database, tbl_name)) |
| for i in range(5): |
| assert str(i) in result.get_data() |
| assert len(result.data) == 6 |
| |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='INSERT' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_insert_event(self, unique_database): |
| """Tests event processor going into error state for insert event""" |
| tbl_name = "hive_table_insert" |
| self.__create_table_and_load__(unique_database, tbl_name, False, True) |
| for _ in range(2): |
| self.client.execute( |
| "insert into {}.{} partition(year=2024) values (1),(2),(3)" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| result = self.client.execute("select count(*) from {}.{}" |
| .format(unique_database, tbl_name)) |
| assert result.data[0] == '6' |
| |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='INSERT_PARTITIONS' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_insert_events(self, unique_database): |
| """Tests event processor going into error state for insert partitions event""" |
| tbl_name = "hive_table_insert_partitions" |
| self.__create_table_and_load__(unique_database, tbl_name, False, True) |
| self.run_stmt_in_hive( |
| "alter table {}.{} add partition(year=2024)" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| for _ in range(2): |
| self.client.execute( |
| "insert into {}.{} partition(year=2024) values (1),(2),(3)" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| result = self.client.execute("select count(*) from {}.{}" |
| .format(unique_database, tbl_name)) |
| assert result.data[0] == '6' |
| |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='DROP_PARTITION' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_drop_partition(self, unique_database): |
| """Tests event processor going into error state for drop partitions event""" |
| tbl_name = "hive_table_drop_partition" |
| self.__create_table_and_load__(unique_database, tbl_name, False, True) |
| self.run_stmt_in_hive( |
| "alter table {}.{} add partition(year=2024)" |
| .format(unique_database, tbl_name)) |
| self.run_stmt_in_hive( |
| "alter table {}.{} drop partition(year=2024)" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| result = self.client.execute("show partitions {}.{}" |
| .format(unique_database, tbl_name)) |
| assert "2024" not in result.get_data() |
| assert len(result.data) == 1 |
| |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='RELOAD' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_reload_event(self, unique_database): |
| """Tests event processor going into error state for reload event""" |
| tbl_name = "hive_table_reload_event" |
| self.__create_table_and_load__(unique_database, tbl_name, False, True) |
| self.run_stmt_in_hive( |
| "alter table {}.{} add partition(year=2024)" |
| .format(unique_database, tbl_name)) |
| # Refresh at table level |
| data = FireEventRequestData() |
| data.refreshEvent = True |
| req = FireEventRequest(True, data) |
| req.dbName = unique_database |
| req.tableName = tbl_name |
| self.hive_client.fire_listener_event(req) |
| EventProcessorUtils.wait_for_event_processing(self) |
| # refresh at partition level |
| req.partitionVals = ["2024"] |
| self.hive_client.fire_listener_event(req) |
| EventProcessorUtils.wait_for_event_processing(self) |
| # invalidate at table level |
| data.refreshEvent = False |
| req = FireEventRequest(True, data) |
| req.dbName = unique_database |
| req.tableName = tbl_name |
| self.hive_client.fire_listener_event(req) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| self.client.execute("describe formatted {}.{}" |
| .format(unique_database, tbl_name)) |
| |
| @SkipIfFS.hive |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types=" |
| "'COMMIT_COMPACTION_EVENT' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_commit_compaction_event(self, unique_database): |
| """Tests event processor going into error state for commit compaction event""" |
| tbl_name = "hive_table_commit_compaction" |
| self.__create_table_and_load__(unique_database, tbl_name, True, False) |
| for _ in range(2): |
| self.run_stmt_in_hive( |
| "insert into {}.{} values (1),(2),(3)" |
| .format(unique_database, tbl_name)) |
| self.run_stmt_in_hive( |
| "alter table {}.{} compact 'minor' and wait" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| self.client.execute("describe formatted {}.{}" |
| .format(unique_database, tbl_name)) |
| |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='ALLOC_WRITE_ID_EVENT' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_alloc_write_id_event(self, unique_database): |
| tbl_name = "hive_table_alloc_write_id" |
| self.__create_table_and_load__(unique_database, tbl_name, True, True) |
| acid = AcidTxn(self.hive_client) |
| txn_id = acid.open_txns() |
| acid.allocate_table_write_ids(txn_id, unique_database, tbl_name) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| self.client.execute("describe formatted {}.{}" |
| .format(unique_database, tbl_name)) |
| |
| @SkipIfFS.hive |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='COMMIT_TXN' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_commit_txn(self, unique_database): |
| tbl_name = "hive_table_commit_txn" |
| self.__create_table_and_load__(unique_database, tbl_name, True, True) |
| self.run_stmt_in_hive( |
| "insert into {}.{} partition (year=2022) values (1),(2),(3)" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| self.client.execute("describe formatted {}.{}" |
| .format(unique_database, tbl_name)) |
| |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_event_types='ABORT_TXN' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_abort_txn_event(self, unique_database): |
| tbl_name = "hive_table_abort_txn" |
| acid = AcidTxn(self.hive_client) |
| self.__create_table_and_load__(unique_database, tbl_name, True, True) |
| txn_id = acid.open_txns() |
| acid.abort_txn(txn_id) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| self.client.execute("describe formatted {}.{}" |
| .format(unique_database, tbl_name)) |
| |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--invalidate_metadata_on_event_processing_failure=false " |
| "--invalidate_global_metadata_on_event_processing_failure=true " |
| "--inject_process_event_failure_event_types=" |
| "'ALTER_TABLE, ADD_PARTITION' " |
| "--hms_event_polling_interval_s=2") |
| def test_event_processor_error_global_invalidate(self, unique_database): |
| """Test to verify that auto global invalidate put back EP to active |
| when it goes into error state""" |
| tbl_name = "hive_table_global_invalidate" |
| self.__create_table_and_load__(unique_database, tbl_name, True, True) |
| self.run_stmt_in_hive( |
| "alter table {}.{} set owner user `test-user`" |
| .format(unique_database, tbl_name)) |
| self.run_stmt_in_hive( |
| "alter table {}.{} add partition(year=2024)" |
| .format(unique_database, tbl_name)) |
| EventProcessorUtils.wait_for_event_processing(self, error_status_possible=True) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| result = self.client.execute("describe formatted {}.{}" |
| .format(unique_database, tbl_name)) |
| self.verify_owner_property(result, 'test-user') |
| result = self.client.execute("show partitions {}.{}" |
| .format(unique_database, tbl_name)) |
| assert "2024" in result.get_data() |
| |
| @SkipIfFS.hive |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal " |
| "--inject_process_event_failure_ratio=0.5 " |
| "--inject_process_event_failure_event_types=" |
| "'ALTER_TABLE,ADD_PARTITION," |
| "ALTER_PARTITION,INSERT,ABORT_TXN,COMMIT_TXN'") |
| def test_event_processor_error_stress_test(self, unique_database): |
| """Executes inserts for transactional tables and external tables. Also runs |
| replication tests |
| """ |
| # inserts on transactional tables |
| TestEventProcessingBase._run_test_insert_events_impl(self, unique_database, True) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| try: |
| test_db = unique_database + "_no_transact" |
| self.run_stmt_in_hive("""create database {}""".format(test_db)) |
| # inserts on external tables |
| TestEventProcessingBase._run_test_insert_events_impl(self, test_db, False) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| finally: |
| self.run_stmt_in_hive("""drop database {} cascade""".format(test_db)) |
| # replication related tests |
| TestEventProcessingBase._run_event_based_replication_tests_impl(self, |
| self.filesystem_client) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| def __create_table_and_load__(self, db_name, table_name, is_transactional, |
| is_partitioned): |
| create_query = " ".join(["create", " transactional " if is_transactional else '', |
| "table `{}`.`{}`(i int)", " partitioned by (year int) " if is_partitioned else '']) |
| self.run_stmt_in_hive(create_query.format(db_name, table_name)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| # Make the table loaded |
| self.client.execute("describe {}.{}".format(db_name, table_name)) |
| |
| @staticmethod |
| def verify_owner_property(result, user_name): |
| match = False |
| for row in result.data: |
| fields = row.split("\t") |
| if "Owner:" in fields[0]: |
| assert user_name == fields[1].strip() |
| match = True |
| assert True, match |