| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import random |
| import string |
| import pytest |
| |
| from tests.common.skip import SkipIfHive2 |
| from tests.common.custom_cluster_test_suite import CustomClusterTestSuite |
| from tests.common.environ import HIVE_MAJOR_VERSION |
| from tests.common.skip import (SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon, |
| SkipIfGCS, SkipIfLocal) |
| from tests.util.hive_utils import HiveDbWrapper |
| from tests.util.event_processor_utils import EventProcessorUtils |
| from tests.util.filesystem_utils import WAREHOUSE |
| |
| |
| @SkipIfS3.hive |
| @SkipIfABFS.hive |
| @SkipIfADLS.hive |
| @SkipIfGCS.hive |
| @SkipIfIsilon.hive |
| @SkipIfLocal.hive |
| class TestEventProcessing(CustomClusterTestSuite): |
| """This class contains tests that exercise the event processing mechanism in the |
| catalog.""" |
| CATALOG_URL = "http://localhost:25020" |
| PROCESSING_TIMEOUT_S = 10 |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--blacklisted_dbs=testBlackListedDb " |
| "--blacklisted_tables=functional_parquet.testBlackListedTbl", |
| catalogd_args="--blacklisted_dbs=testBlackListedDb " |
| "--blacklisted_tables=functional_parquet.testBlackListedTbl " |
| "--hms_event_polling_interval_s=1") |
| def test_events_on_blacklisted_objects(self): |
| """Executes hive queries on blacklisted database and tables and makes sure that |
| event processor does not error out |
| """ |
| try: |
| event_id_before = EventProcessorUtils.get_last_synced_event_id() |
| # create a blacklisted database from hive and make sure event is ignored |
| self.run_stmt_in_hive("create database TESTblackListedDb") |
| # wait until all the events generated above are processed |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| assert EventProcessorUtils.get_last_synced_event_id() > event_id_before |
| # make sure that the blacklisted db is ignored |
| assert "TESTblackListedDb".lower() not in self.all_db_names() |
| |
| event_id_before = EventProcessorUtils.get_last_synced_event_id() |
| self.run_stmt_in_hive("create table testBlackListedDb.testtbl (id int)") |
| # create a table on the blacklisted database with a different case |
| self.run_stmt_in_hive("create table TESTBLACKlISTEDDb.t2 (id int)") |
| self.run_stmt_in_hive( |
| "create table functional_parquet.testBlackListedTbl (id int, val string)" |
| " partitioned by (part int) stored as parquet") |
| self.run_stmt_in_hive( |
| "alter table functional_parquet.testBlackListedTbl add partition (part=1)") |
| # wait until all the events generated above are processed |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| assert EventProcessorUtils.get_last_synced_event_id() > event_id_before |
| # make sure that the black listed table is not created |
| table_names = self.client.execute("show tables in functional_parquet").get_data() |
| assert "testBlackListedTbl".lower() not in table_names |
| |
| event_id_before = EventProcessorUtils.get_last_synced_event_id() |
| # generate a table level event with a different case |
| self.run_stmt_in_hive("drop table functional_parquet.TESTBlackListedTbl") |
| # wait until all the events generated above are processed |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| assert EventProcessorUtils.get_last_synced_event_id() > event_id_before |
| finally: |
| self.run_stmt_in_hive("drop database testBlackListedDb cascade") |
| self.run_stmt_in_hive("drop table functional_parquet.testBlackListedTbl") |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| @SkipIfHive2.acid |
| def test_transactional_insert_events(self): |
| """Executes 'run_test_insert_events' for transactional tables. |
| """ |
| self.run_test_insert_events(is_transactional=True) |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| def test_insert_events(self): |
| """Executes 'run_test_insert_events' for non-transactional tables. |
| """ |
| self.run_test_insert_events() |
| |
| def run_test_insert_events(self, is_transactional=False): |
| """Test for insert event processing. Events are created in Hive and processed in |
| Impala. The following cases are tested : |
| Insert into table --> for partitioned and non-partitioned table |
| Insert overwrite table --> for partitioned and non-partitioned table |
| Insert into partition --> for partitioned table |
| """ |
| db_name = self.__get_random_name("insert_event_db_") |
| tblproperties = self.__get_transactional_tblproperties(is_transactional) |
| with HiveDbWrapper(self, db_name): |
| # Test table with no partitions. |
| test_tbl_name = 'tbl_insert_nopart' |
| self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, test_tbl_name)) |
| self.run_stmt_in_hive("create table %s.%s (id int, val int) %s" |
| % (db_name, test_tbl_name, tblproperties)) |
| # Test insert into table, this will fire an insert event. |
| self.run_stmt_in_hive("insert into %s.%s values(101, 200)" |
| % (db_name, test_tbl_name)) |
| # With MetastoreEventProcessor running, the insert event will be processed. Query |
| # the table from Impala. |
| EventProcessorUtils.wait_for_event_processing(self) |
| # Verify that the data is present in Impala. |
| data = self.execute_scalar("select * from %s.%s" % (db_name, test_tbl_name)) |
| assert data.split('\t') == ['101', '200'] |
| |
| # Test insert overwrite. Overwrite the existing value. |
| self.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)" |
| % (db_name, test_tbl_name)) |
| # Make sure the event has been processed. |
| EventProcessorUtils.wait_for_event_processing(self) |
| # Verify that the data is present in Impala. |
| data = self.execute_scalar("select * from %s.%s" % (db_name, test_tbl_name)) |
| assert data.split('\t') == ['101', '201'] |
| |
| # Test partitioned table. |
| test_part_tblname = 'tbl_insert_part' |
| self.run_stmt_in_hive("drop table if exists %s.%s" % (db_name, test_part_tblname)) |
| self.run_stmt_in_hive("create table %s.%s (id int, name string) " |
| "partitioned by(day int, month int, year int) %s" |
| % (db_name, test_part_tblname, tblproperties)) |
| # Insert data into partitions. |
| self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)" |
| "values(101, 'x')" % (db_name, test_part_tblname)) |
| # Make sure the event has been processed. |
| EventProcessorUtils.wait_for_event_processing(self) |
| # Verify that the data is present in Impala. |
| data = self.execute_scalar("select * from %s.%s" % (db_name, test_part_tblname)) |
| assert data.split('\t') == ['101', 'x', '28', '3', '2019'] |
| |
| # Test inserting into existing partitions. |
| self.run_stmt_in_hive("insert into %s.%s partition(day=28, month=03, year=2019)" |
| "values(102, 'y')" % (db_name, test_part_tblname)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| # Verify that the data is present in Impala. |
| data = self.execute_scalar("select count(*) from %s.%s where day=28 and month=3 " |
| "and year=2019" % (db_name, test_part_tblname)) |
| assert data.split('\t') == ['2'] |
| |
| # Test insert overwrite into existing partitions |
| self.run_stmt_in_hive("insert overwrite table %s.%s partition(day=28, month=03, " |
| "year=2019)" "values(101, 'z')" % (db_name, test_part_tblname)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| # Verify that the data is present in Impala. |
| data = self.execute_scalar("select * from %s.%s where day=28 and month=3 and" |
| " year=2019 and id=101" % (db_name, test_part_tblname)) |
| assert data.split('\t') == ['101', 'z', '28', '3', '2019'] |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| @SkipIfHive2.acid |
| def test_empty_partition_events_transactional(self, unique_database): |
| self._run_test_empty_partition_events(unique_database, True) |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| def test_empty_partition_events(self, unique_database): |
| self._run_test_empty_partition_events(unique_database, False) |
| |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| def test_self_events(self, unique_database): |
| """Runs multiple queries which generate events and makes |
| sure that tables and partitions are not refreshed the queries is run from Impala. If |
| the queries are run from Hive, we make sure that the tables and partitions are |
| refreshed""" |
| self.__run_self_events_test(unique_database, True) |
| self.__run_self_events_test(unique_database, False) |
| |
| @pytest.mark.xfail(run=False, reason="This is failing due to HIVE-23995") |
| @CustomClusterTestSuite.with_args(catalogd_args="--hms_event_polling_interval_s=1") |
| def test_event_based_replication(self): |
| self.__run_event_based_replication_tests() |
| |
| def __run_event_based_replication_tests(self, transactional=True): |
| """Hive Replication relies on the insert events generated on the tables. |
| This test issues some basic replication commands from Hive and makes sure |
| that the replicated table has correct data.""" |
| TBLPROPERTIES = self.__get_transactional_tblproperties(transactional) |
| source_db = self.__get_random_name("repl_source_") |
| target_db = self.__get_random_name("repl_target_") |
| unpartitioned_tbl = "unpart_tbl" |
| partitioned_tbl = "part_tbl" |
| try: |
| self.run_stmt_in_hive("create database {0}".format(source_db)) |
| self.run_stmt_in_hive( |
| "alter database {0} set dbproperties ('repl.source.for'='xyz')".format(source_db)) |
| # explicit create table command since create table like doesn't allow tblproperties |
| self.client.execute("create table {0}.{1} (a string, b string) stored as parquet" |
| " {2}".format(source_db, unpartitioned_tbl, TBLPROPERTIES)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| self.client.execute( |
| "create table {0}.{1} (id int, bool_col boolean, tinyint_col tinyint, " |
| "smallint_col smallint, int_col int, bigint_col bigint, float_col float, " |
| "double_col double, date_string string, string_col string, " |
| "timestamp_col timestamp) partitioned by (year int, month int) stored as parquet" |
| " {2}".format(source_db, partitioned_tbl, TBLPROPERTIES)) |
| |
| # case I: insert |
| # load the table with some data from impala, this also creates new partitions. |
| self.client.execute("insert into {0}.{1}" |
| " select * from functional.tinytable".format(source_db, |
| unpartitioned_tbl)) |
| self.client.execute("insert into {0}.{1} partition(year,month)" |
| " select * from functional_parquet.alltypessmall".format( |
| source_db, partitioned_tbl)) |
| rows_in_unpart_tbl = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(source_db, unpartitioned_tbl)).split('\t')[ |
| 0]) |
| rows_in_part_tbl = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(source_db, partitioned_tbl)).split('\t')[0]) |
| assert rows_in_unpart_tbl > 0 |
| assert rows_in_part_tbl > 0 |
| # bootstrap the replication |
| self.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # create a target database where tables will be replicated |
| self.client.execute("create database {0}".format(target_db)) |
| # replicate the table from source to target |
| self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert unpartitioned_tbl in self.client.execute( |
| "show tables in {0}".format(target_db)).get_data() |
| assert partitioned_tbl in self.client.execute( |
| "show tables in {0}".format(target_db)).get_data() |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_target = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) |
| .split('\t')[0]) |
| rows_in_part_tbl_target = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)) |
| .split('\t')[0]) |
| assert rows_in_unpart_tbl == rows_in_unpart_tbl_target |
| assert rows_in_part_tbl == rows_in_part_tbl_target |
| |
| # case II: insert into existing partitions. |
| self.client.execute("insert into {0}.{1}" |
| " select * from functional.tinytable".format( |
| source_db, unpartitioned_tbl)) |
| self.client.execute("insert into {0}.{1} partition(year,month)" |
| " select * from functional_parquet.alltypessmall".format( |
| source_db, partitioned_tbl)) |
| self.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # replicate the table from source to target |
| self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) |
| # we wait until the events catch up in case repl command above did some HMS |
| # operations. |
| EventProcessorUtils.wait_for_event_processing(self) |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_target = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) |
| .split('\t')[0]) |
| rows_in_part_tbl_target = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)).split('\t')[0]) |
| assert 2 * rows_in_unpart_tbl == rows_in_unpart_tbl_target |
| assert 2 * rows_in_part_tbl == rows_in_part_tbl_target |
| |
| # Case III: insert overwrite |
| # impala does a insert overwrite of the tables. |
| self.client.execute("insert overwrite table {0}.{1}" |
| " select * from functional.tinytable".format( |
| source_db, unpartitioned_tbl)) |
| self.client.execute("insert overwrite table {0}.{1} partition(year,month)" |
| " select * from functional_parquet.alltypessmall".format( |
| source_db, partitioned_tbl)) |
| self.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # replicate the table from source to target |
| self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) |
| # we wait until the events catch up in case repl command above did some HMS |
| # operations. |
| EventProcessorUtils.wait_for_event_processing(self) |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_target = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) |
| .split('\t')[0]) |
| rows_in_part_tbl_target = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)).split('\t')[0]) |
| assert rows_in_unpart_tbl == rows_in_unpart_tbl_target |
| assert rows_in_part_tbl == rows_in_part_tbl_target |
| |
| # Case IV: CTAS which creates a transactional table. |
| self.client.execute( |
| "create table {0}.insertonly_nopart_ctas {1} as " |
| "select * from {0}.{2}".format(source_db, TBLPROPERTIES, unpartitioned_tbl)) |
| self.client.execute( |
| "create table {0}.insertonly_part_ctas partitioned by (year, month) {1}" |
| " as select * from {0}.{2}".format(source_db, TBLPROPERTIES, partitioned_tbl)) |
| self.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # replicate the table from source to target |
| self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) |
| # we wait until the events catch up in case repl command above did some HMS |
| # operations. |
| EventProcessorUtils.wait_for_event_processing(self) |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_source = int(self.execute_scalar("select count(*) from " |
| "{0}.insertonly_nopart_ctas".format(source_db)).split('\t')[0]) |
| rows_in_unpart_tbl_target = int(self.execute_scalar("select count(*) from " |
| "{0}.insertonly_nopart_ctas".format(target_db)).split('\t')[0]) |
| assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target |
| rows_in_unpart_tbl_source = int(self.execute_scalar("select count(*) from " |
| "{0}.insertonly_part_ctas".format(source_db)).split('\t')[0]) |
| rows_in_unpart_tbl_target = int(self.execute_scalar("select count(*) from " |
| "{0}.insertonly_part_ctas".format(target_db)).split('\t')[0]) |
| assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target |
| |
| # Case V: truncate table |
| # impala truncates both the tables. Make sure replication sees that. |
| self.client.execute("truncate table {0}.{1}".format(source_db, unpartitioned_tbl)) |
| self.client.execute("truncate table {0}.{1}".format(source_db, partitioned_tbl)) |
| self.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # replicate the table from source to target |
| self.run_stmt_in_hive("repl load {0} into {1}".format(source_db, target_db)) |
| # we wait until the events catch up in case repl command above did some HMS |
| # operations. |
| EventProcessorUtils.wait_for_event_processing(self) |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_target = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) |
| .split('\t')[0]) |
| rows_in_part_tbl_target = int(self.execute_scalar( |
| "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)).split('\t')[0]) |
| assert rows_in_unpart_tbl_target == 0 |
| assert rows_in_part_tbl_target == 0 |
| finally: |
| src_db = self.__get_db_nothrow(source_db) |
| target_db_obj = self.__get_db_nothrow(target_db) |
| if src_db is not None: |
| self.run_stmt_in_hive( |
| "alter database {0} set dbproperties ('repl.source.for'='')".format(source_db)) |
| self.run_stmt_in_hive("drop database if exists {0} cascade".format(source_db)) |
| if target_db_obj is not None: |
| self.run_stmt_in_hive("drop database if exists {0} cascade".format(target_db)) |
| # workaround for HIVE-24135. the managed db location doesn't get cleaned up |
| if src_db is not None and src_db.managedLocationUri is not None: |
| self.filesystem_client.delete_file_dir(src_db.managedLocationUri, True) |
| if target_db_obj is not None and target_db_obj.managedLocationUri is not None: |
| self.filesystem_client.delete_file_dir(target_db_obj.managedLocationUri, True) |
| |
| def __get_db_nothrow(self, name): |
| try: |
| return self.hive_client.get_database(name) |
| except: |
| return None |
| |
| def _run_test_empty_partition_events(self, unique_database, is_transactional): |
| test_tbl = unique_database + ".test_events" |
| TBLPROPERTIES = self.__get_transactional_tblproperties(is_transactional) |
| self.run_stmt_in_hive("create table {0} (key string, value string) \ |
| partitioned by (year int) stored as parquet {1}".format(test_tbl, TBLPROPERTIES)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| self.client.execute("describe {0}".format(test_tbl)) |
| |
| self.run_stmt_in_hive( |
| "alter table {0} add partition (year=2019)".format(test_tbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year') |
| |
| self.run_stmt_in_hive( |
| "alter table {0} add if not exists partition (year=2019)".format(test_tbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert [('2019',)] == self.get_impala_partition_info(test_tbl, 'year') |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| self.run_stmt_in_hive( |
| "alter table {0} drop partition (year=2019)".format(test_tbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year') |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| self.run_stmt_in_hive( |
| "alter table {0} drop if exists partition (year=2019)".format(test_tbl)) |
| EventProcessorUtils.wait_for_event_processing(self) |
| assert ('2019') not in self.get_impala_partition_info(test_tbl, 'year') |
| assert EventProcessorUtils.get_event_processor_status() == "ACTIVE" |
| |
| def __get_tbl_location(self, db_name, tbl_name): |
| assert self.hive_client is not None |
| return self.hive_client.get_table(db_name, tbl_name).sd.location |
| |
| def __get_transactional_tblproperties(self, is_transactional): |
| """ |
| Util method to generate the tblproperties for transactional tables |
| """ |
| tblproperties = "" |
| if is_transactional: |
| tblproperties = "tblproperties ('transactional'='true'," \ |
| "'transactional_properties'='insert_only')" |
| return tblproperties |
| |
| def __run_self_events_test(self, db_name, use_impala): |
| recover_tbl_name = self.__get_random_name("tbl_") |
| # create a table similar to alltypes so that we can recover the partitions on it |
| # later in one of the test queries |
| alltypes_tab_location = self.__get_tbl_location("functional", "alltypes") |
| self.client.execute( |
| "create external table {0}.{1} like functional.alltypes location '{2}'".format( |
| db_name, recover_tbl_name, alltypes_tab_location)) |
| if use_impala: |
| queries = self.__get_impala_test_queries(db_name, recover_tbl_name) |
| # some queries do not trigger self-event evaluation (creates and drops) however, |
| # its still good to confirm that we don't do unnecessary refreshes in such cases |
| for stmt in queries[False]: |
| self.__exec_sql_and_check_selfevent_counter(stmt, use_impala, False) |
| # All the queries with True key should confirm that the self-events-skipped counter |
| # is also incremented |
| for stmt in queries[True]: |
| self.__exec_sql_and_check_selfevent_counter(stmt, use_impala) |
| else: |
| queries = self.__get_hive_test_queries(db_name, recover_tbl_name) |
| for stmt in queries: |
| self.__exec_sql_and_check_selfevent_counter(stmt, use_impala) |
| |
| def __get_impala_test_queries(self, db_name, recover_tbl_name): |
| tbl_name = self.__get_random_name("tbl_") |
| acid_tbl_name = self.__get_random_name("acid_tbl_") |
| acid_no_part_tbl_name = self.__get_random_name("acid_no_part_tbl_") |
| tbl2 = self.__get_random_name("tbl_") |
| view_name = self.__get_random_name("view_") |
| # create a empty table for both partitioned and unpartitioned case for testing insert |
| # events |
| empty_unpartitioned_tbl = self.__get_random_name("insert_test_tbl_") |
| empty_partitioned_tbl = self.__get_random_name("insert_test_parttbl_") |
| self.client.execute( |
| "create table {0}.{1} (c1 int)".format(db_name, empty_unpartitioned_tbl)) |
| self.client.execute( |
| "create table {0}.{1} (c1 int) partitioned by (part int)".format(db_name, |
| empty_partitioned_tbl)) |
| acid_props = self.__get_transactional_tblproperties(True) |
| self_event_test_queries = { |
| # Queries which will increment the self-events-skipped counter |
| True: [ |
| # ALTER_DATABASE case |
| "comment on database {0} is 'self-event test database'".format(db_name), |
| "alter database {0} set owner user `test-user`".format(db_name), |
| "create function {0}.f() returns int location '{1}/libTestUdfs.so' " |
| "symbol='NoArgs'".format(db_name, WAREHOUSE), |
| "drop function {0}.f()".format(db_name), |
| # ALTER_TABLE case |
| "alter table {0}.{1} set TBLPROPERTIES ('k'='v')".format(db_name, tbl_name), |
| "alter table {0}.{1} ADD COLUMN c1 int".format(db_name, tbl_name), |
| "alter table {0}.{1} ALTER COLUMN C1 set comment 'c1 comment'".format(db_name, |
| tbl_name), |
| "alter table {0}.{1} ADD COLUMNS (c2 int, c3 string)".format(db_name, tbl_name), |
| "alter table {0}.{1} DROP COLUMN c1".format(db_name, tbl_name), |
| "alter table {0}.{1} DROP COLUMN c2".format(db_name, tbl_name), |
| "alter table {0}.{1} DROP COLUMN c3".format(db_name, tbl_name), |
| "alter table {0}.{1} set owner user `test-user`".format(db_name, tbl_name), |
| "alter table {0}.{1} set owner role `test-role`".format(db_name, tbl_name), |
| "alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl_name, tbl2), |
| "alter view {0}.{1} set owner user `test-view-user`".format(db_name, view_name), |
| "alter view {0}.{1} set owner role `test-view-role`".format(db_name, view_name), |
| "alter view {0}.{1} rename to {0}.{2}".format(db_name, view_name, |
| self.__get_random_name("view_")), |
| # ADD_PARTITION cases |
| # dynamic partition insert (creates new partitions) |
| "insert into table {0}.{1} partition (year,month) " |
| "select * from functional.alltypessmall".format(db_name, tbl2), |
| # add partition |
| "alter table {0}.{1} add if not exists partition (year=1111, month=1)".format( |
| db_name, tbl2), |
| # compute stats will generates ALTER_PARTITION |
| "compute stats {0}.{1}".format(db_name, tbl2), |
| "alter table {0}.{1} recover partitions".format(db_name, recover_tbl_name), |
| # insert into a existing partition; generates INSERT self-event |
| "insert into table {0}.{1} partition " |
| "(year, month) select * from functional.alltypessmall where year=2009 " |
| "and month=1".format(db_name, tbl2), |
| # insert overwrite query from Impala also generates a INSERT self-event |
| "insert overwrite table {0}.{1} partition " |
| "(year, month) select * from functional.alltypessmall where year=2009 " |
| "and month=1".format(db_name, tbl2), |
| ], |
| # Queries which will not increment the self-events-skipped counter |
| False: [ |
| "create table {0}.{1} like functional.alltypessmall " |
| "stored as parquet".format(db_name, tbl_name), |
| "create view {0}.{1} as select * from functional.alltypessmall " |
| "where year=2009".format(db_name, view_name), |
| # we add this statement below just to make sure that the subsequent statement is |
| # a no-op |
| "alter table {0}.{1} add if not exists partition (year=2100, month=1)".format( |
| db_name, tbl_name), |
| "alter table {0}.{1} add if not exists partition (year=2100, month=1)".format( |
| db_name, tbl_name), |
| # DROP_PARTITION cases |
| "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format( |
| db_name, tbl_name), |
| # drop non-existing partition; essentially this is a no-op |
| "alter table {0}.{1} drop if exists partition (year=2100, month=1)".format( |
| db_name, tbl_name), |
| # empty table case where no insert events are generated |
| "insert overwrite {0}.{1} select * from {0}.{1}".format( |
| db_name, empty_unpartitioned_tbl), |
| "insert overwrite {0}.{1} partition(part) select * from {0}.{1}".format( |
| db_name, empty_partitioned_tbl), |
| # in case of ACID tables no INSERT event is generated as the COMMIT event |
| # contains the related data |
| "create table {0}.{1} (c1 int) {2}".format(db_name, |
| acid_no_part_tbl_name, acid_props), |
| "insert into table {0}.{1} values (1) ".format(db_name, acid_no_part_tbl_name), |
| "insert overwrite table {0}.{1} select * from {0}.{1}".format( |
| db_name, acid_no_part_tbl_name), |
| "truncate table {0}.{1}".format(db_name, acid_no_part_tbl_name), |
| # the table is empty so the following insert adds 0 rows |
| "insert overwrite table {0}.{1} select * from {0}.{1}".format( |
| db_name, acid_no_part_tbl_name), |
| "create table {0}.{1} (c1 int) partitioned by (part int) {2}".format(db_name, |
| acid_tbl_name, acid_props), |
| "insert into table {0}.{1} partition (part=1) " |
| "values (1) ".format(db_name, acid_tbl_name), |
| "insert into table {0}.{1} partition (part) select id, int_col " |
| "from functional.alltypestiny".format(db_name, acid_tbl_name), |
| # repeat the same insert, now it writes to existing partitions |
| "insert into table {0}.{1} partition (part) select id, int_col " |
| "from functional.alltypestiny".format(db_name, acid_tbl_name), |
| # following insert overwrite is used instead of truncate, because truncate |
| # leads to a non-self event that reloads the table |
| "insert overwrite table {0}.{1} partition (part) select id, int_col " |
| "from functional.alltypestiny where id=-1".format(db_name, acid_tbl_name), |
| # the table is empty so the following inserts add 0 rows |
| "insert overwrite table {0}.{1} partition (part) select id, int_col " |
| "from functional.alltypestiny".format(db_name, acid_tbl_name), |
| "insert overwrite {0}.{1} partition(part) select * from {0}.{1}".format( |
| db_name, acid_tbl_name), |
| ] |
| } |
| return self_event_test_queries |
| |
| def __get_hive_test_queries(self, db_name, recover_tbl_name): |
| tbl_name = self.__get_random_name("tbl_") |
| tbl2 = self.__get_random_name("tbl_") |
| view_name = self.__get_random_name("view_") |
| # we use a custom table schema to make it easier to change columns later in the |
| # test_queries |
| self.client.execute("create table {0}.{1} (key int) partitioned by " |
| "(part int) stored as parquet".format(db_name, tbl_name)) |
| self.client.execute( |
| "create view {0}.{1} as select * from functional.alltypessmall where year=2009" |
| .format(db_name, view_name)) |
| # events-processor only refreshes loaded tables, hence its important to issue a |
| # refresh here so that table is in loaded state |
| self.client.execute("refresh {0}.{1}".format(db_name, tbl_name)) |
| self_event_test_queries = [ |
| # ALTER_DATABASE cases |
| "alter database {0} set dbproperties ('comment'='self-event test database')".format( |
| db_name), |
| "alter database {0} set owner user `test-user`".format(db_name), |
| # ALTER_TABLE case |
| "alter table {0}.{1} set tblproperties ('k'='v')".format(db_name, tbl_name), |
| "alter table {0}.{1} add columns (value string)".format(db_name, tbl_name), |
| "alter table {0}.{1} set owner user `test-user`".format(db_name, tbl_name), |
| "alter table {0}.{1} set owner role `test-role`".format(db_name, tbl_name), |
| "alter table {0}.{1} rename to {0}.{2}".format(db_name, tbl_name, tbl2), |
| "alter view {0}.{1} rename to {0}.{2}".format(db_name, view_name, |
| self.__get_random_name("view_")), |
| # need to set this config to make sure the dynamic partition insert works below |
| "set hive.exec.dynamic.partition.mode=nonstrict", |
| # ADD_PARTITION cases |
| "insert into table {0}.{1} partition (part=2009) " |
| "select id as key, string_col as value from functional.alltypessmall".format( |
| db_name, tbl2), |
| # add partition |
| "alter table {0}.{1} add if not exists partition (part=1111)".format( |
| db_name, tbl2), |
| # add existing partition; essentially this is a no-op |
| "alter table {0}.{1} add if not exists partition (part=1111)".format( |
| db_name, tbl2), |
| # DROP_PARTITION cases |
| "alter table {0}.{1} drop if exists partition (part=1111)".format( |
| db_name, tbl2), |
| # drop non-existing partition; essentially this is a no-op |
| "alter table {0}.{1} drop if exists partition (part=1111)".format( |
| db_name, tbl2), |
| # compute stats will generates ALTER_PARTITION |
| "analyze table {0}.{1} compute statistics for columns".format(db_name, tbl2), |
| "msck repair table {0}.{1}".format(db_name, recover_tbl_name) |
| ] |
| return self_event_test_queries |
| |
| @staticmethod |
| def __get_self_event_metrics(): |
| """ |
| Gets the self-events-skipped, tables-refreshed and partitions-refreshed metric values |
| from Metastore EventsProcessor |
| """ |
| tbls_refreshed_count = EventProcessorUtils.get_event_processor_metric( |
| 'tables-refreshed', 0) |
| partitions_refreshed_count = EventProcessorUtils.get_event_processor_metric( |
| 'partitions-refreshed', 0) |
| self_events_count = EventProcessorUtils.get_event_processor_metric( |
| 'self-events-skipped', 0) |
| return int(self_events_count), int(tbls_refreshed_count), int( |
| partitions_refreshed_count) |
| |
| def __exec_sql_and_check_selfevent_counter(self, stmt, use_impala_client, |
| check_self_event_counter=True): |
| """ |
| Method runs a given query statement using a impala client or hive client based on the |
| argument use_impala_client and confirms if the self-event related counters are as |
| expected based on whether we expect a self-event or not. If the |
| check_self_event_counter is False it skips checking the self-events-skipped metric. |
| """ |
| self_events, tbls_refreshed, partitions_refreshed = self.__get_self_event_metrics() |
| if not use_impala_client: |
| self.run_stmt_in_hive(stmt) |
| else: |
| self.client.execute(stmt) |
| |
| EventProcessorUtils.wait_for_event_processing(self) |
| self_events_after, tbls_refreshed_after, partitions_refreshed_after = \ |
| self.__get_self_event_metrics() |
| # we assume that any event which comes due to stmts run from impala-client are |
| # self-events |
| if use_impala_client: |
| # self-event counter must increase if this is a self-event if |
| # check_self_event_counter is set |
| if check_self_event_counter: |
| assert self_events_after > self_events |
| # if this is a self-event, no table or partitions should be refreshed |
| assert tbls_refreshed == tbls_refreshed_after |
| assert partitions_refreshed == partitions_refreshed_after |
| else: |
| # hive was used to run the stmts, any events generated should not have been deemed |
| # as self events |
| assert self_events == self_events_after |
| |
| @staticmethod |
| def __get_random_name(prefix=''): |
| """ |
| Gets a random name used to create unique database or table |
| """ |
| return prefix + ''.join(random.choice(string.ascii_lowercase) for i in range(5)) |