blob: 15d3ec7f1eb19ea15418b1be1bed3350148b49c0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
import threading
from multiprocessing.pool import ThreadPool
from multiprocessing import TimeoutError
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
class TestConcurrentDdls(CustomClusterTestSuite):
"""Test concurrent DDLs with invalidate metadata"""
def _make_per_impalad_args(local_catalog_enabled):
assert isinstance(local_catalog_enabled, list)
args = ['--use_local_catalog=%s' % str(e).lower()
for e in local_catalog_enabled]
return "--per_impalad_args=" + ";".join(args)
@pytest.mark.timeout(120)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=false",
catalogd_args="--catalog_topic_mode=full")
def test_ddls_with_invalidate_metadata(self, unique_database):
self._run_ddls_with_invalidation(unique_database, sync_ddl=False)
@pytest.mark.timeout(300)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=false",
catalogd_args="--catalog_topic_mode=full")
def test_ddls_with_invalidate_metadata_sync_ddl(self, unique_database):
self._run_ddls_with_invalidation(unique_database, sync_ddl=True)
@pytest.mark.timeout(120)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
start_args=_make_per_impalad_args([True, False]),
catalogd_args="--catalog_topic_mode=mixed")
def test_mixed_catalog_ddls_with_invalidate_metadata(self, unique_database):
self._run_ddls_with_invalidation(unique_database, sync_ddl=False)
@pytest.mark.timeout(300)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
start_args=_make_per_impalad_args([True, False]),
catalogd_args="--catalog_topic_mode=mixed")
def test_mixed_catalog_ddls_with_invalidate_metadata_sync_ddl(self, unique_database):
self._run_ddls_with_invalidation(unique_database, sync_ddl=True)
@pytest.mark.timeout(120)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal")
def test_local_catalog_ddls_with_invalidate_metadata(self, unique_database):
self._run_ddls_with_invalidation(unique_database, sync_ddl=False)
@pytest.mark.timeout(300)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal")
def test_local_catalog_ddls_with_invalidate_metadata_sync_ddl(self, unique_database):
self._run_ddls_with_invalidation(unique_database, sync_ddl=True)
def _run_ddls_with_invalidation(self, db, sync_ddl=False):
"""Test INVALIDATE METADATA with concurrent DDLs to see if any queries hang"""
test_self = self
class ThreadLocalClient(threading.local):
def __init__(self):
self.client = test_self.create_impala_client()
if sync_ddl:
self.client.set_configuration_option('sync_ddl', 'true')
pool = ThreadPool(processes=8)
tls = ThreadLocalClient()
def run_ddls(i):
tbl_name = db + ".test_" + str(i)
for query_tmpl in [
# Create a partitioned and unpartitioned table
"create table %s (i int)",
"create table %s_part (i int) partitioned by (j int)",
# Below queries could fail if running with invalidate metadata concurrently
"alter table %s_part add partition (j=1)",
"alter table %s_part add partition (j=2)",
"invalidate metadata %s_part",
"refresh %s",
"refresh %s_part",
"insert overwrite table %s select int_col from functional.alltypestiny",
"insert overwrite table %s_part partition(j=1) values (1), (2), (3), (4), (5)",
"insert overwrite table %s_part partition(j=2) values (1), (2), (3), (4), (5)"
]:
try:
query = query_tmpl % tbl_name
# TODO(IMPALA-9123): Timeout logic here does not work for DDLs since they are
# usually stuck in CREATED state and execute_async() won't return. We finally
# use timeout in pytest.mark.timeout() but it's not precise. We should find a
# more elegant way to detect timeout of DDLs.
handle = tls.client.execute_async(query)
is_finished = tls.client.wait_for_finished_timeout(handle, timeout=60)
assert is_finished, "Query timeout(60s): " + query
tls.client.close_query(handle)
except ImpalaBeeswaxException as e:
# Could raise exception when running with INVALIDATE METADATA
assert TestConcurrentDdls.is_acceptable_error(str(e), sync_ddl), str(e)
# TODO(IMPALA-9123): Detect hangs here instead of using pytest.mark.timeout()
self.execute_query_expect_success(tls.client, "invalidate metadata")
# Run DDLs in single thread first. Some bugs causing DDL hangs can be hidden when run
# with concurrent DDLs.
run_ddls(0)
# Run DDLs with invalidate metadata in parallel
NUM_ITERS = 16
for i in pool.imap_unordered(run_ddls, xrange(1, NUM_ITERS + 1)):
pass
@classmethod
def is_acceptable_error(cls, err, sync_ddl):
# DDL/DMLs may fail if running with invalidate metadata concurrently, since in-flight
# table loadings can't finish if the target table is changed (e.g. reset to unloaded
# state). See more in CatalogOpExecutor.getExistingTable().
if "CatalogException: Table" in err and \
"was modified while operation was in progress, aborting execution" in err:
return True
# TODO: Consider remove this case after IMPALA-9135 is fixed.
if sync_ddl and "Couldn't retrieve the catalog topic version for the SYNC_DDL " \
"operation after 5 attempts.The operation has been successfully " \
"executed but its effects may have not been broadcast to all the " \
"coordinators." in err:
return True
return False
@pytest.mark.timeout(120)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
impalad_args="--use_local_catalog=true",
catalogd_args="--catalog_topic_mode=minimal")
def test_concurrent_invalidate_metadata(self):
"""Test concurrent requests for INVALIDATE METADATA not hang"""
test_self = self
class ThreadLocalClient(threading.local):
def __init__(self):
self.client = test_self.create_impala_client()
tls = ThreadLocalClient()
def run_invalidate_metadata():
# TODO(IMPALA-9123): Detect hangs here instead of using pytest.mark.timeout
self.execute_query_expect_success(tls.client, "invalidate metadata")
tls.client.close()
NUM_ITERS = 20
for i in xrange(NUM_ITERS):
# Run two INVALIDATE METADATA commands in parallel
pool = ThreadPool(processes=2)
r1 = pool.apply_async(run_invalidate_metadata)
r2 = pool.apply_async(run_invalidate_metadata)
try:
r1.get(timeout=60)
r2.get(timeout=60)
except TimeoutError:
assert False, "INVALIDATE METADATA timeout in 60s!"
pool.terminate()