| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import pytest |
| import random |
| import time |
| import traceback |
| |
| from multiprocessing import Value |
| from multiprocessing.pool import ThreadPool |
| |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.common.parametrize import UniqueDatabase |
| from tests.common.skip import SkipIfHive2, SkipIfS3 |
| |
| NUM_OVERWRITES = 2 |
| NUM_INSERTS_PER_OVERWRITE = 4 |
| |
| |
| class Task: |
| """Helper class for parallel execution.""" |
| def __init__(self, func, *args, **kwargs): |
| self.func = func |
| self.args = args |
| self.kwargs = kwargs |
| |
| def run(self): |
| try: |
| return self.func(*self.args, **self.kwargs) |
| except Exception: |
| traceback.print_exc() |
| raise |
| |
| |
| def run_tasks(tasks): |
| pool = ThreadPool(processes=len(tasks)) |
| pool.map_async(Task.run, tasks).get(600) |
| |
| |
| class TestAcidStress(ImpalaTestSuite): |
| @classmethod |
| def get_workload(self): |
| return 'targeted-stress' |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestAcidStress, cls).add_test_dimensions() |
| # Could be moved to exhaustive tests due to the long execution time, but this only |
| # runs with Hive3, where the main goal currently is to make ACID work, so it is better |
| # to run this frequently. |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: (v.get_value('table_format').file_format == 'text' and |
| v.get_value('table_format').compression_codec == 'none')) |
| |
| |
| class TestAcidInsertsBasic(TestAcidStress): |
| @classmethod |
| def get_workload(self): |
| return super(TestAcidInsertsBasic, self).get_workload() |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestAcidInsertsBasic, cls).add_test_dimensions() |
| |
| def _verify_result(self, result, expected_result): |
| """Verify invariants for 'run' and 'i'.""" |
| assert len(result.data) > 0 |
| run_max = -1 |
| i_list = [] |
| for line in result.data: |
| [run, i] = map(int, (line.split('\t'))) |
| run_max = max(run_max, run) |
| i_list.append(i) |
| assert expected_result["run"] <= run_max # shouldn't see data overwritten in the past |
| i_list.sort() |
| if expected_result["run"] < run_max: |
| expected_result["run"] = run_max |
| expected_result["i"] = 0 |
| return |
| assert i_list[-1] >= expected_result["i"] |
| assert i_list == range(i_list[-1] + 1) # 'i' should have all values from 0 to max_i |
| expected_result["i"] = i_list[-1] |
| |
| def _hive_role_write_inserts(self, tbl_name, partitioned): |
| """INSERT INTO/OVERWRITE a table several times from Hive.""" |
| part_expr = "partition (p=1)" if partitioned else "" |
| for run in xrange(0, NUM_OVERWRITES): |
| OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i) |
| """ % (tbl_name, part_expr, run, 0) |
| self.run_stmt_in_hive(OVERWRITE_SQL) |
| for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1): |
| INSERT_SQL = """insert into table %s %s values (%i, %i) |
| """ % (tbl_name, part_expr, run, i) |
| self.run_stmt_in_hive(INSERT_SQL) |
| |
| def _impala_role_write_inserts(self, tbl_name, partitioned): |
| """INSERT INTO/OVERWRITE a table several times from Impala.""" |
| try: |
| impalad_client = ImpalaTestSuite.create_impala_client() |
| part_expr = "partition (p=1)" if partitioned else "" |
| for run in xrange(0, NUM_OVERWRITES + 1): |
| OVERWRITE_SQL = """insert overwrite table %s %s values (%i, %i) |
| """ % (tbl_name, part_expr, run, 0) |
| impalad_client.execute(OVERWRITE_SQL) |
| for i in xrange(1, NUM_INSERTS_PER_OVERWRITE + 1): |
| INSERT_SQL = """insert into table %s %s values (%i, %i) |
| """ % (tbl_name, part_expr, run, i) |
| impalad_client.execute(INSERT_SQL) |
| finally: |
| impalad_client.close() |
| |
| def _impala_role_read_inserts(self, tbl_name, needs_refresh, sleep_seconds): |
| """SELECT from a table many times until the expected final values are found.""" |
| try: |
| impalad_client = ImpalaTestSuite.create_impala_client() |
| expected_result = {"run": -1, "i": 0} |
| accept_empty_table = True |
| while expected_result["run"] != NUM_OVERWRITES and \ |
| expected_result["i"] != NUM_INSERTS_PER_OVERWRITE: |
| time.sleep(sleep_seconds) |
| if needs_refresh: impalad_client.execute("refresh %s" % tbl_name) |
| result = impalad_client.execute("select run, i from %s" % tbl_name) |
| if len(result.data) == 0: |
| assert accept_empty_table |
| continue |
| accept_empty_table = False |
| self._verify_result(result, expected_result) |
| finally: |
| impalad_client.close() |
| |
| def _create_table(self, full_tbl_name, partitioned): |
| """Creates test table with name 'full_tbl_name'. Table is partitioned if |
| 'partitioned' is set to True.""" |
| part_expr = "partitioned by (p int)" if partitioned else "" |
| |
| CREATE_SQL = """create table %s (run int, i int) %s TBLPROPERTIES ( |
| 'transactional_properties' = 'insert_only', 'transactional' = 'true') |
| """ % (full_tbl_name, part_expr) |
| self.client.execute("drop table if exists %s" % full_tbl_name) |
| self.client.execute(CREATE_SQL) |
| |
| def _run_test_read_hive_inserts(self, unique_database, partitioned): |
| """Check that Impala can read a single insert only ACID table (over)written by Hive |
| several times. Consistency can be checked by using incremental values for |
| overwrites ('run') and inserts ('i'). |
| """ |
| tbl_name = "%s.test_read_hive_inserts" % unique_database |
| self._create_table(tbl_name, partitioned) |
| |
| run_tasks([ |
| Task(self._hive_role_write_inserts, tbl_name, partitioned), |
| Task(self._impala_role_read_inserts, tbl_name, needs_refresh=True, |
| sleep_seconds=3)]) |
| |
| def _run_test_read_impala_inserts(self, unique_database, partitioned): |
| """Check that Impala can read a single insert only ACID table (over)written by Hive |
| several times. Consistency can be checked by using incremental values for |
| overwrites ('run') and inserts ('i'). |
| """ |
| tbl_name = "%s.test_read_impala_inserts" % unique_database |
| self._create_table(tbl_name, partitioned) |
| |
| run_tasks([ |
| Task(self._impala_role_write_inserts, tbl_name, partitioned), |
| Task(self._impala_role_read_inserts, tbl_name, needs_refresh=False, |
| sleep_seconds=0.1)]) |
| |
| @SkipIfHive2.acid |
| @SkipIfS3.hive |
| @pytest.mark.execute_serially |
| @pytest.mark.stress |
| def test_read_hive_inserts(self, unique_database): |
| """Check that Impala can read partitioned and non-partitioned ACID tables |
| written by Hive.""" |
| for is_partitioned in [False, True]: |
| self._run_test_read_hive_inserts(unique_database, is_partitioned) |
| |
| @SkipIfHive2.acid |
| @pytest.mark.execute_serially |
| @pytest.mark.stress |
| def test_read_impala_inserts(self, unique_database): |
| """Check that Impala can read partitioned and non-partitioned ACID tables |
| written by Hive.""" |
| for is_partitioned in [False, True]: |
| self._run_test_read_impala_inserts(unique_database, is_partitioned) |
| |
| |
| class TestConcurrentAcidInserts(TestAcidStress): |
| @classmethod |
| def get_workload(self): |
| return super(TestConcurrentAcidInserts, self).get_workload() |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestConcurrentAcidInserts, cls).add_test_dimensions() |
| |
| def _impala_role_concurrent_writer(self, tbl_name, wid, counter): |
| """Writes ascending numbers into column 'i'. To column 'wid' it writes its identifier |
| passed in parameter 'wid'. Occasionally it truncates the table.""" |
| target_impalad = wid % ImpalaTestSuite.get_impalad_cluster_size() |
| impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) |
| try: |
| num_inserts = 0 |
| while num_inserts < 50: |
| impalad_client.execute("insert into table %s values (%i, %i)" % ( |
| tbl_name, wid, num_inserts)) |
| num_inserts += 1 |
| if random.randint(0, 100) < 5: |
| impalad_client.execute("truncate table %s" % tbl_name) |
| finally: |
| with counter.get_lock(): |
| counter.value += 1 |
| impalad_client.close() |
| |
| def _impala_role_concurrent_checker(self, tbl_name, cid, counter, writers): |
| """Checks if the table's invariant is true. The invariant is that for each |
| 'wid' column 'i' should store a continuous integer range.""" |
| def verify_result_set(result): |
| wid_to_run = dict() |
| for line in result.data: |
| [wid, i] = map(int, (line.split('\t'))) |
| wid_to_run.setdefault(wid, []).append(i) |
| for wid, run in wid_to_run.items(): |
| sorted_run = sorted(run) |
| assert sorted_run == range(sorted_run[0], sorted_run[-1] + 1), "wid: %d" % wid |
| |
| target_impalad = cid % ImpalaTestSuite.get_impalad_cluster_size() |
| impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) |
| try: |
| while counter.value != writers: |
| result = impalad_client.execute("select * from %s" % tbl_name) |
| verify_result_set(result) |
| time.sleep(0.5) |
| finally: |
| impalad_client.close() |
| |
| @SkipIfHive2.acid |
| @pytest.mark.execute_serially |
| @pytest.mark.stress |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_concurrent_inserts(self, unique_database): |
| """Issues INSERT statements against multiple impalads in a way that some |
| invariants must be true when a spectator process inspects the table. E.g. |
| if the table contains continuous ranges of integers.""" |
| tbl_name = "%s.test_concurrent_inserts" % unique_database |
| self.client.set_configuration_option("SYNC_DDL", "true") |
| self.client.execute("drop table if exists %s" % tbl_name) |
| self.client.execute("""create table %s (wid int, i int) TBLPROPERTIES ( |
| 'transactional_properties' = 'insert_only', 'transactional' = 'true') |
| """ % tbl_name) |
| |
| counter = Value('i', 0) |
| num_writers = 6 |
| num_checkers = 3 |
| |
| writers = [Task(self._impala_role_concurrent_writer, tbl_name, i, counter) |
| for i in xrange(0, num_writers)] |
| checkers = [Task(self._impala_role_concurrent_checker, tbl_name, i, counter, |
| num_writers) |
| for i in xrange(0, num_checkers)] |
| run_tasks(writers + checkers) |
| |
| |
| class TestFailingAcidInserts(TestAcidStress): |
| @classmethod |
| def get_workload(self): |
| return super(TestFailingAcidInserts, self).get_workload() |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestFailingAcidInserts, cls).add_test_dimensions() |
| |
| def _impala_role_insert(self, tbl_name, partitioned, target_impalad, counter): |
| """Inserts data to table 'tbl_name'. INSERTs with the value 1 should succeed, while |
| INSERTs with the value -1 must fail with a debug action. |
| Occasionally it truncates the table.""" |
| FAIL_ACTION = "CLIENT_REQUEST_UPDATE_CATALOG:FAIL@1.0" |
| impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) |
| try: |
| num_inserts = 0 |
| while num_inserts < 50: |
| try: |
| should_succeed = random.randint(0, 99) < 50 |
| val = 1 if should_succeed else -1 |
| impalad_client.set_configuration_option( |
| "DEBUG_ACTION", "" if should_succeed else FAIL_ACTION) |
| part_expr = "partition (p=%d)" % random.randint(0, 3) if partitioned else "" |
| impalad_client.execute( |
| "insert into table %s %s values (%i)" % (tbl_name, part_expr, val)) |
| num_inserts += 1 |
| if random.randint(0, 100) < 5: |
| impalad_client.execute("truncate table %s" % tbl_name) |
| except Exception as e: |
| if should_succeed or "CLIENT_REQUEST_UPDATE_CATALOG" not in str(e): |
| raise e |
| finally: |
| with counter.get_lock(): |
| counter.value += 1 |
| impalad_client.close() |
| |
| def _impala_role_checker(self, tbl_name, target_impalad, counter, writers): |
| """Checks that the table doesn't contain other values than 1.""" |
| impalad_client = ImpalaTestSuite.create_client_for_nth_impalad(target_impalad) |
| try: |
| while counter.value != writers: |
| result = impalad_client.execute("select * from %s where i != 1" % tbl_name) |
| assert len(result.data) == 0 |
| time.sleep(0.5) |
| finally: |
| impalad_client.close() |
| |
| def _run_test_failing_inserts(self, unique_database, partitioned): |
| """Tests that failing INSERTs cannot be observed.""" |
| tbl_name = "%s.test_inserts_fail" % unique_database |
| self.client.set_configuration_option("SYNC_DDL", "true") |
| self.client.execute("drop table if exists %s" % tbl_name) |
| part_expr = "partitioned by (p int)" if partitioned else "" |
| self.client.execute("""create table %s (i int) %s TBLPROPERTIES ( |
| 'transactional_properties' = 'insert_only', 'transactional' = 'true') |
| """ % (tbl_name, part_expr)) |
| |
| counter = Value('i', 0) |
| num_writers = 3 |
| num_checkers = 3 |
| |
| writers = [Task(self._impala_role_insert, tbl_name, partitioned, i, counter) |
| for i in xrange(0, num_writers)] |
| checkers = [Task(self._impala_role_checker, tbl_name, i, counter, num_writers) |
| for i in xrange(0, num_checkers)] |
| run_tasks(writers + checkers) |
| |
| @SkipIfHive2.acid |
| @pytest.mark.execute_serially |
| @pytest.mark.stress |
| @UniqueDatabase.parametrize(sync_ddl=True) |
| def test_failing_inserts(self, unique_database): |
| """Tests that failing INSERTs cannot be observed.""" |
| for is_partitioned in [False, True]: |
| self._run_test_failing_inserts(unique_database, is_partitioned) |