| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.util.event_processor_utils import EventProcessorUtils |
| |
| EVENT_SYNC_QUERY_OPTIONS = { |
| "sync_hms_events_wait_time_s": 30, |
| "sync_hms_events_strict_mode": True |
| } |
| |
| |
| class TestEventProcessingBase(ImpalaTestSuite): |
| |
| @classmethod |
| def setup_class(cls): |
| super(TestEventProcessingBase, cls).setup_class() |
| |
| @classmethod |
| def _run_test_insert_events_impl(cls, suite, unique_database, is_transactional=False): |
| """Test for insert event processing. Events are created in Hive and processed in |
| Impala. The following cases are tested : |
| Insert into table --> for partitioned and non-partitioned table |
| Insert overwrite table --> for partitioned and non-partitioned table |
| Insert into partition --> for partitioned table |
| """ |
| # TODO: change into an instance method and remove argument "suite" (IMPALA-14174) |
| with suite.create_impala_client() as impala_client: |
| # Test table with no partitions. |
| tbl_insert_nopart = 'tbl_insert_nopart' |
| suite.run_stmt_in_hive( |
| "drop table if exists %s.%s" % (unique_database, tbl_insert_nopart)) |
| tblproperties = "" |
| if is_transactional: |
| tblproperties = "tblproperties ('transactional'='true'," \ |
| "'transactional_properties'='insert_only')" |
| cls.run_stmt_in_hive("create table %s.%s (id int, val int) %s" |
| % (unique_database, tbl_insert_nopart, tblproperties)) |
| impala_client.set_configuration(EVENT_SYNC_QUERY_OPTIONS) |
| # Test CTAS and insert by Impala with empty results (IMPALA-10765). |
| cls.execute_query_expect_success(impala_client, |
| "create table {db}.ctas_tbl {prop} as select * from {db}.{tbl}" |
| .format(db=unique_database, tbl=tbl_insert_nopart, prop=tblproperties)) |
| cls.execute_query_expect_success(impala_client, |
| "insert into {db}.ctas_tbl select * from {db}.{tbl}" |
| .format(db=unique_database, tbl=tbl_insert_nopart)) |
| # Test insert into table, this will fire an insert event. |
| cls.run_stmt_in_hive("insert into %s.%s values(101, 200)" |
| % (unique_database, tbl_insert_nopart)) |
| # With MetastoreEventProcessor running, the insert event will be processed. Query |
| # the table from Impala. Verify that the data is present in Impala. |
| data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" % |
| (unique_database, tbl_insert_nopart)) |
| assert data.split('\t') == ['101', '200'] |
| |
| # Test insert overwrite. Overwrite the existing value. |
| cls.run_stmt_in_hive("insert overwrite table %s.%s values(101, 201)" |
| % (unique_database, tbl_insert_nopart)) |
| # Make sure the event has been processed using sync_hms_events_wait_time_s. |
| # Verify that the data is present in Impala. |
| data = cls.execute_scalar_expect_success(impala_client, "select * from %s.%s" % |
| (unique_database, tbl_insert_nopart)) |
| assert data.split('\t') == ['101', '201'] |
| # Test insert overwrite by Impala with empty results (IMPALA-10765). |
| cls.execute_query_expect_success(impala_client, |
| "insert overwrite {db}.{tbl} select * from {db}.ctas_tbl" |
| .format(db=unique_database, tbl=tbl_insert_nopart)) |
| result = cls.execute_query_expect_success(impala_client, |
| "select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_nopart)) |
| assert len(result.data) == 0 |
| |
| # Test partitioned table. |
| tbl_insert_part = 'tbl_insert_part' |
| cls.run_stmt_in_hive("drop table if exists %s.%s" |
| % (unique_database, tbl_insert_part)) |
| cls.run_stmt_in_hive("create table %s.%s (id int, name string) " |
| "partitioned by(day int, month int, year int) %s" |
| % (unique_database, tbl_insert_part, tblproperties)) |
| # Test insert overwrite by Impala with empty results (IMPALA-10765). |
| cls.execute_query_expect_success(impala_client, |
| "create table {db}.ctas_part partitioned by (day, month, year) {prop} as " |
| "select * from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part, |
| prop=tblproperties)) |
| cls.execute_query_expect_success(impala_client, |
| "insert into {db}.ctas_part partition(day=0, month=0, year=0) select id, " |
| "name from {db}.{tbl}".format(db=unique_database, tbl=tbl_insert_part)) |
| # Insert data into partitions. |
| cls.run_stmt_in_hive( |
| "insert into %s.%s partition(day=28, month=03, year=2019)" |
| "values(101, 'x')" % (unique_database, tbl_insert_part)) |
| # Make sure the event has been processed using sync_hms_events_wait_time_s. |
| # Verify that the data is present in Impala. |
| data = cls.execute_scalar_expect_success(impala_client, |
| "select * from %s.%s" % (unique_database, tbl_insert_part)) |
| assert data.split('\t') == ['101', 'x', '28', '3', '2019'] |
| |
| # Test inserting into existing partitions. |
| cls.run_stmt_in_hive( |
| "insert into %s.%s partition(day=28, month=03, year=2019)" |
| "values(102, 'y')" % (unique_database, tbl_insert_part)) |
| # Verify that the data is present in Impala. |
| data = cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from %s.%s where day=28 and month=3 " |
| "and year=2019" % (unique_database, tbl_insert_part)) |
| assert data.split('\t') == ['2'] |
| # Test inserting into existing partitions by Impala with empty results |
| # (IMPALA-10765). |
| cls.execute_query_expect_success(impala_client, |
| "insert into {db}.{tbl} partition(day=28, month=03, year=2019) " |
| "select id, name from {db}.ctas_part" |
| .format(db=unique_database, tbl=tbl_insert_part)) |
| |
| # Test insert overwrite into existing partitions |
| cls.run_stmt_in_hive( |
| "insert overwrite table %s.%s partition(day=28, month=03, " |
| "year=2019)" "values(101, 'z')" % (unique_database, tbl_insert_part)) |
| # Verify that the data is present in Impala. |
| data = cls.execute_scalar_expect_success(impala_client, |
| "select * from %s.%s where day=28 and month=3 and" |
| " year=2019 and id=101" % (unique_database, tbl_insert_part)) |
| assert data.split('\t') == ['101', 'z', '28', '3', '2019'] |
| impala_client.clear_configuration() |
| # Test insert overwrite into existing partitions by Impala with empty results |
| # (IMPALA-10765). |
| cls.execute_query_expect_success(impala_client, "insert overwrite {db}.{tbl} " |
| "partition(day=28, month=03, year=2019) " |
| "select id, name from {db}.ctas_part" |
| .format(db=unique_database, tbl=tbl_insert_part)) |
| result = cls.execute_query_expect_success(impala_client, "select * from {db}.{tbl} " |
| "where day=28 and month=3 and year=2019" |
| .format(db=unique_database, tbl=tbl_insert_part)) |
| assert len(result.data) == 0 |
| |
| @classmethod |
| def _run_event_based_replication_tests_impl(cls, suite, |
| filesystem_client, transactional=True): |
| """Hive Replication relies on the insert events generated on the tables. |
| This test issues some basic replication commands from Hive and makes sure |
| that the replicated table has correct data.""" |
| # TODO: change into an instance method and remove argument "suite" (IMPALA-14174) |
| TBLPROPERTIES = cls._get_transactional_tblproperties(transactional) |
| source_db = ImpalaTestSuite.get_random_name("repl_source_") |
| target_db = ImpalaTestSuite.get_random_name("repl_target_") |
| unpartitioned_tbl = "unpart_tbl" |
| partitioned_tbl = "part_tbl" |
| impala_client = suite.create_impala_client() |
| try: |
| cls.run_stmt_in_hive("create database {0}".format(source_db)) |
| cls.run_stmt_in_hive( |
| "alter database {0} set dbproperties ('repl.source.for'='xyz')" |
| .format(source_db)) |
| impala_client.set_configuration(EVENT_SYNC_QUERY_OPTIONS) |
| # explicit create table command since create table like doesn't allow tblproperties |
| impala_client.execute("create table {0}.{1} (a string, b string) stored as parquet" |
| " {2}".format(source_db, unpartitioned_tbl, TBLPROPERTIES)) |
| impala_client.execute( |
| "create table {0}.{1} (id int, bool_col boolean, tinyint_col tinyint, " |
| "smallint_col smallint, int_col int, bigint_col bigint, float_col float, " |
| "double_col double, date_string string, string_col string, " |
| "timestamp_col timestamp) partitioned by (year int, month int) stored as parquet" |
| " {2}".format(source_db, partitioned_tbl, TBLPROPERTIES)) |
| |
| # case I: insert |
| # load the table with some data from impala, this also creates new partitions. |
| impala_client.execute("insert into {0}.{1}" |
| " select * from functional.tinytable".format(source_db, |
| unpartitioned_tbl)) |
| impala_client.execute("insert into {0}.{1} partition(year,month)" |
| " select * from functional_parquet.alltypessmall".format( |
| source_db, partitioned_tbl)) |
| rows_in_unpart_tbl = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(source_db, unpartitioned_tbl)).split('\t')[ |
| 0]) |
| rows_in_part_tbl = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(source_db, partitioned_tbl)) |
| .split('\t')[0]) |
| assert rows_in_unpart_tbl > 0 |
| assert rows_in_part_tbl > 0 |
| # bootstrap the replication |
| cls.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # create a target database where tables will be replicated |
| impala_client.execute("create database {0}".format(target_db)) |
| # replicate the table from source to target |
| cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, |
| target_db)) |
| assert unpartitioned_tbl in impala_client.execute( |
| "show tables in {0}".format(target_db)).get_data() |
| assert partitioned_tbl in impala_client.execute( |
| "show tables in {0}".format(target_db)).get_data() |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) |
| .split('\t')[0]) |
| rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)) |
| .split('\t')[0]) |
| assert rows_in_unpart_tbl == rows_in_unpart_tbl_target |
| assert rows_in_part_tbl == rows_in_part_tbl_target |
| |
| # case II: insert into existing partitions. |
| impala_client.execute("insert into {0}.{1}" |
| " select * from functional.tinytable".format( |
| source_db, unpartitioned_tbl)) |
| impala_client.execute("insert into {0}.{1} partition(year,month)" |
| " select * from functional_parquet.alltypessmall".format( |
| source_db, partitioned_tbl)) |
| cls.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # replicate the table from source to target |
| cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, |
| target_db)) |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) |
| .split('\t')[0]) |
| rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)) |
| .split('\t')[0]) |
| assert 2 * rows_in_unpart_tbl == rows_in_unpart_tbl_target |
| assert 2 * rows_in_part_tbl == rows_in_part_tbl_target |
| |
| # Case III: insert overwrite |
| # impala does a insert overwrite of the tables. |
| impala_client.execute("insert overwrite table {0}.{1}" |
| " select * from functional.tinytable".format( |
| source_db, unpartitioned_tbl)) |
| impala_client.execute("insert overwrite table {0}.{1} partition(year,month)" |
| " select * from functional_parquet.alltypessmall".format( |
| source_db, partitioned_tbl)) |
| cls.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # replicate the table from source to target |
| cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, |
| target_db)) |
| # we wait (20sec) until the events catch up in case repl command above did some HMS |
| # operations. |
| EventProcessorUtils.wait_for_event_processing(suite, timeout=20) |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) |
| .split('\t')[0]) |
| rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)) |
| .split('\t')[0]) |
| assert rows_in_unpart_tbl == rows_in_unpart_tbl_target |
| assert rows_in_part_tbl == rows_in_part_tbl_target |
| |
| # Case IV: CTAS which creates a transactional table. |
| impala_client.execute( |
| "create table {0}.insertonly_nopart_ctas {1} as " |
| "select * from {0}.{2}".format(source_db, TBLPROPERTIES, unpartitioned_tbl)) |
| impala_client.execute( |
| "create table {0}.insertonly_part_ctas partitioned by (year, month) {1}" |
| " as select * from {0}.{2}".format(source_db, TBLPROPERTIES, partitioned_tbl)) |
| cls.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # replicate the table from source to target |
| cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, |
| target_db)) |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_source = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from " |
| "{0}.insertonly_nopart_ctas".format(source_db)).split('\t')[0]) |
| rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from " |
| "{0}.insertonly_nopart_ctas".format(target_db)).split('\t')[0]) |
| assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target |
| rows_in_unpart_tbl_source = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from " |
| "{0}.insertonly_part_ctas".format(source_db)).split('\t')[0]) |
| rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from " |
| "{0}.insertonly_part_ctas".format(target_db)).split('\t')[0]) |
| assert rows_in_unpart_tbl_source == rows_in_unpart_tbl_target |
| |
| # Case V: truncate table |
| # impala truncates both the tables. Make sure replication sees that. |
| impala_client.execute("truncate table {0}.{1}".format(source_db, |
| unpartitioned_tbl)) |
| impala_client.execute("truncate table {0}.{1}".format(source_db, partitioned_tbl)) |
| cls.run_stmt_in_hive("repl dump {0}".format(source_db)) |
| # replicate the table from source to target |
| cls.run_stmt_in_hive("repl load {0} into {1}".format(source_db, |
| target_db)) |
| # confirm the number of rows in target match with the source table. |
| rows_in_unpart_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(target_db, unpartitioned_tbl)) |
| .split('\t')[0]) |
| rows_in_part_tbl_target = int(cls.execute_scalar_expect_success(impala_client, |
| "select count(*) from {0}.{1}".format(target_db, partitioned_tbl)) |
| .split('\t')[0]) |
| assert rows_in_unpart_tbl_target == 0 |
| assert rows_in_part_tbl_target == 0 |
| finally: |
| src_db = cls.__get_db_nothrow(source_db) |
| target_db_obj = cls.__get_db_nothrow(target_db) |
| if src_db is not None: |
| cls.run_stmt_in_hive( |
| "alter database {0} set dbproperties ('repl.source.for'='')".format(source_db)) |
| cls.run_stmt_in_hive("drop database if exists {0} cascade" |
| .format(source_db)) |
| if target_db_obj is not None: |
| cls.run_stmt_in_hive("drop database if exists {0} cascade" |
| .format(target_db)) |
| # workaround for HIVE-24135. the managed db location doesn't get cleaned up |
| if src_db is not None and src_db.managedLocationUri is not None: |
| filesystem_client.delete_file_dir(src_db.managedLocationUri, |
| True) |
| if target_db_obj is not None and target_db_obj.managedLocationUri is not None: |
| filesystem_client.delete_file_dir( |
| target_db_obj.managedLocationUri, True) |
| impala_client.close() |
| |
| @classmethod |
| def __get_db_nothrow(self, name): |
| try: |
| return self.hive_client.get_database(name) |
| except Exception: |
| return None |
| |
| @classmethod |
| def _get_transactional_tblproperties(self, is_transactional): |
| """ |
| Util method to generate the tblproperties for transactional tables |
| """ |
| tblproperties = "" |
| if is_transactional: |
| tblproperties = "tblproperties ('transactional'='true'," \ |
| "'transactional_properties'='insert_only')" |
| return tblproperties |