| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import pytest |
| import threading |
| |
| from multiprocessing.pool import ThreadPool |
| from multiprocessing import TimeoutError |
| |
| from tests.beeswax.impala_beeswax import ImpalaBeeswaxException |
| from tests.common.custom_cluster_test_suite import CustomClusterTestSuite |
| |
| |
| class TestConcurrentDdls(CustomClusterTestSuite): |
| """Test concurrent DDLs with invalidate metadata""" |
| |
| def _make_per_impalad_args(local_catalog_enabled): |
| assert isinstance(local_catalog_enabled, list) |
| args = ['--use_local_catalog=%s' % str(e).lower() |
| for e in local_catalog_enabled] |
| return "--per_impalad_args=" + ";".join(args) |
| |
| @pytest.mark.timeout(120) |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=false", |
| catalogd_args="--catalog_topic_mode=full") |
| def test_ddls_with_invalidate_metadata(self, unique_database): |
| self._run_ddls_with_invalidation(unique_database, sync_ddl=False) |
| |
| @pytest.mark.timeout(300) |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=false", |
| catalogd_args="--catalog_topic_mode=full") |
| def test_ddls_with_invalidate_metadata_sync_ddl(self, unique_database): |
| self._run_ddls_with_invalidation(unique_database, sync_ddl=True) |
| |
| @pytest.mark.timeout(120) |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| start_args=_make_per_impalad_args([True, False]), |
| catalogd_args="--catalog_topic_mode=mixed") |
| def test_mixed_catalog_ddls_with_invalidate_metadata(self, unique_database): |
| self._run_ddls_with_invalidation(unique_database, sync_ddl=False) |
| |
| @pytest.mark.timeout(300) |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| start_args=_make_per_impalad_args([True, False]), |
| catalogd_args="--catalog_topic_mode=mixed") |
| def test_mixed_catalog_ddls_with_invalidate_metadata_sync_ddl(self, unique_database): |
| self._run_ddls_with_invalidation(unique_database, sync_ddl=True) |
| |
| @pytest.mark.timeout(120) |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal") |
| def test_local_catalog_ddls_with_invalidate_metadata(self, unique_database): |
| self._run_ddls_with_invalidation(unique_database, sync_ddl=False) |
| |
| @pytest.mark.timeout(300) |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal") |
| def test_local_catalog_ddls_with_invalidate_metadata_sync_ddl(self, unique_database): |
| self._run_ddls_with_invalidation(unique_database, sync_ddl=True) |
| |
| def _run_ddls_with_invalidation(self, db, sync_ddl=False): |
| """Test INVALIDATE METADATA with concurrent DDLs to see if any queries hang""" |
| test_self = self |
| |
| class ThreadLocalClient(threading.local): |
| def __init__(self): |
| self.client = test_self.create_impala_client() |
| if sync_ddl: |
| self.client.set_configuration_option('sync_ddl', 'true') |
| |
| pool = ThreadPool(processes=8) |
| tls = ThreadLocalClient() |
| |
| def run_ddls(i): |
| tbl_name = db + ".test_" + str(i) |
| for query_tmpl in [ |
| # Create a partitioned and unpartitioned table |
| "create table %s (i int)", |
| "create table %s_part (i int) partitioned by (j int)", |
| # Below queries could fail if running with invalidate metadata concurrently |
| "alter table %s_part add partition (j=1)", |
| "alter table %s_part add partition (j=2)", |
| "invalidate metadata %s_part", |
| "refresh %s", |
| "refresh %s_part", |
| "insert overwrite table %s select int_col from functional.alltypestiny", |
| "insert overwrite table %s_part partition(j=1) values (1), (2), (3), (4), (5)", |
| "insert overwrite table %s_part partition(j=2) values (1), (2), (3), (4), (5)" |
| ]: |
| try: |
| query = query_tmpl % tbl_name |
| # TODO(IMPALA-9123): Timeout logic here does not work for DDLs since they are |
| # usually stuck in CREATED state and execute_async() won't return. We finally |
| # use timeout in pytest.mark.timeout() but it's not precise. We should find a |
| # more elegant way to detect timeout of DDLs. |
| handle = tls.client.execute_async(query) |
| is_finished = tls.client.wait_for_finished_timeout(handle, timeout=60) |
| assert is_finished, "Query timeout(60s): " + query |
| tls.client.close_query(handle) |
| except ImpalaBeeswaxException as e: |
| # Could raise exception when running with INVALIDATE METADATA |
| assert TestConcurrentDdls.is_acceptable_error(str(e), sync_ddl), str(e) |
| # TODO(IMPALA-9123): Detect hangs here instead of using pytest.mark.timeout() |
| self.execute_query_expect_success(tls.client, "invalidate metadata") |
| |
| # Run DDLs in single thread first. Some bugs causing DDL hangs can be hidden when run |
| # with concurrent DDLs. |
| run_ddls(0) |
| |
| # Run DDLs with invalidate metadata in parallel |
| NUM_ITERS = 16 |
| for i in pool.imap_unordered(run_ddls, xrange(1, NUM_ITERS + 1)): |
| pass |
| |
| @classmethod |
| def is_acceptable_error(cls, err, sync_ddl): |
| # DDL/DMLs may fail if running with invalidate metadata concurrently, since in-flight |
| # table loadings can't finish if the target table is changed (e.g. reset to unloaded |
| # state). See more in CatalogOpExecutor.getExistingTable(). |
| if "CatalogException: Table" in err and \ |
| "was modified while operation was in progress, aborting execution" in err: |
| return True |
| # TODO: Consider remove this case after IMPALA-9135 is fixed. |
| if sync_ddl and "Couldn't retrieve the catalog topic version for the SYNC_DDL " \ |
| "operation after 5 attempts.The operation has been successfully " \ |
| "executed but its effects may have not been broadcast to all the " \ |
| "coordinators." in err: |
| return True |
| return False |
| |
| @pytest.mark.timeout(120) |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args="--use_local_catalog=true", |
| catalogd_args="--catalog_topic_mode=minimal") |
| def test_concurrent_invalidate_metadata(self): |
| """Test concurrent requests for INVALIDATE METADATA not hang""" |
| test_self = self |
| |
| class ThreadLocalClient(threading.local): |
| def __init__(self): |
| self.client = test_self.create_impala_client() |
| |
| tls = ThreadLocalClient() |
| |
| def run_invalidate_metadata(): |
| # TODO(IMPALA-9123): Detect hangs here instead of using pytest.mark.timeout |
| self.execute_query_expect_success(tls.client, "invalidate metadata") |
| tls.client.close() |
| |
| NUM_ITERS = 20 |
| for i in xrange(NUM_ITERS): |
| # Run two INVALIDATE METADATA commands in parallel |
| pool = ThreadPool(processes=2) |
| r1 = pool.apply_async(run_invalidate_metadata) |
| r2 = pool.apply_async(run_invalidate_metadata) |
| try: |
| r1.get(timeout=60) |
| r2.get(timeout=60) |
| except TimeoutError: |
| assert False, "INVALIDATE METADATA timeout in 60s!" |
| pool.terminate() |