blob: 30069fed2c276548d760fabe640aff7a4488f3a6 [file] [log] [blame]
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from multiprocessing.pool import ThreadPool
import pytest
import time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfS3, SkipIfGCS
@SkipIfS3.variable_listing_times
@SkipIfGCS.variable_listing_times
class TestTopicUpdateFrequency(CustomClusterTestSuite):
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args="--topic_update_tbl_max_wait_time_ms=500")
def test_topic_updates_unblock(self):
"""Test to simulate query blocking conditions as per IMPALA-6671
and makes sure that unrelated queries are not blocked by other long running
queries which block topic updates."""
# queries that we don't expect to block when a slow running blocking query is
# running in parallel. We want these queries to request the metadata from catalogd
# and hence the init queries invalidate the metadata before each test case run below.
non_blocking_queries = [
# each of these take about 2-4 seconds when there is no lock contention.
"describe functional.emptytable",
"select * from functional.tinytable limit 1",
"show partitions functional.alltypessmall",
]
# queries used to reset the metadata of the non_blocking_queries so that they will
# reload the next time they are executed
init_queries = [
"invalidate metadata functional.emptytable",
"invalidate metadata functional.tinytable",
"invalidate metadata functional.alltypessmall",
]
# make sure that the blocking query metadata is loaded in catalogd since table lock
# is only acquired on loaded tables.
self.client.execute("refresh tpcds.store_sales")
self.client.execute("refresh functional.alltypes")
# add the debug actions so that blocking queries take long time complete while
# holding the table lock. These debug actions are tuned such that each of the blocking
# queries below take little more than 10 seconds (2x slower than fast queries).
debug_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@30|catalogd_table_recover_delay:SLEEP@10000|catalogd_update_stats_delay:SLEEP@10000"
blocking_query_options = {
"debug_action": debug_action,
"sync_ddl": "false"
}
blocking_queries = [
"refresh tpcds.store_sales",
"alter table tpcds.store_sales recover partitions",
"compute stats functional.alltypes"
]
for blocking_query in blocking_queries:
print("Running blocking query: {0}".format(blocking_query))
# blocking query is without sync_ddl
blocking_query_options["sync_ddl"] = "false"
self.__run_topic_update_test(blocking_query,
non_blocking_queries, init_queries, blocking_query_options=blocking_query_options)
# blocking query is with sync_ddl
blocking_query_options["sync_ddl"] = "true"
self.__run_topic_update_test(blocking_query,
non_blocking_queries, init_queries, blocking_query_options=blocking_query_options)
non_blocking_query_options = {
"sync_ddl": "true",
}
self.__run_topic_update_test(blocking_query,
non_blocking_queries, init_queries, blocking_query_options=blocking_query_options,
non_blocking_query_options=non_blocking_query_options)
blocking_query_options["sync_ddl"] = "false"
self.__run_topic_update_test(blocking_query,
non_blocking_queries, init_queries, blocking_query_options=blocking_query_options,
non_blocking_query_options=non_blocking_query_options)
def __run_topic_update_test(self, slow_blocking_query, fast_queries,
init_queries, blocking_query_options,
non_blocking_query_options=None, blocking_query_min_time=10000,
fast_query_timeout_ms=6000, non_blocking_impalad=0,
expect_topic_updates_to_block=False):
"""This function runs the slow query in a Impala client and then creates separate
Impala clients to run the fast_queries. It makes sure that the
fast_queries don't get blocked by the slow query by making sure that
fast_queries return back before the blocking query within a given expected
timeout."""
assert fast_query_timeout_ms < blocking_query_min_time
# run the init queries first in sync_ddl mode so that all the impalads are starting
# with a clean state.
for q in init_queries:
self.execute_query(q)
pool = ThreadPool(processes=len(fast_queries))
slow_query_pool = ThreadPool(processes=len(slow_blocking_query))
# run the slow query on the impalad-0 with the given query options
slow_query_future = slow_query_pool.apply_async(self.exec_and_time,
args=(slow_blocking_query, blocking_query_options, 0))
# there is no other good way other than to wait for some time to make sure that
# the slow query has been submitted and being compiled before we start the
# non_blocking queries
time.sleep(1)
fast_query_futures = {}
for fast_query in fast_queries:
# run other queries on second impalad
fast_query_futures[fast_query] = pool.apply_async(self.exec_and_time,
args=(fast_query, non_blocking_query_options, non_blocking_impalad))
for fast_query in fast_query_futures:
if not expect_topic_updates_to_block:
assert fast_query_futures[
fast_query].get() < fast_query_timeout_ms, \
"{0} did not complete within {1} msec".format(fast_query, fast_query_timeout_ms)
else:
# topic updates are expected to block and hence all the other queries should run
# only after blocking query finishes.
fast_query_futures[
fast_query].get() > blocking_query_min_time, \
"{0} did not complete within {1} msec".format(fast_query, fast_query_timeout_ms)
# make sure that the slow query exceeds the given timeout; otherwise the test
# doesn't make much sense.
assert slow_query_future.get() > blocking_query_min_time, \
"{0} query took less time than {1} msec".format(slow_blocking_query,
blocking_query_min_time)
# we wait for some time here to make sure that the topic updates from the last
# query have been propagated so that next run of this method starts from a clean
# state.
time.sleep(2)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args="--topic_update_tbl_max_wait_time_ms=500")
def test_topic_updates_advance(self):
"""Test make sure that a if long running blocking queries are run continuously
topic-update thread is not starved and it eventually blocks until it acquires a table
lock."""
# Each of these queries take complete about 30s with the debug action delays
# below.
blocking_queries = [
"refresh tpcds.store_sales",
"alter table tpcds.store_sales recover partitions",
"compute stats functional.alltypes"
]
debug_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@30|catalogd_table_recover_delay:SLEEP@10000|catalogd_update_stats_delay:SLEEP@10000"
# loop in sync_ddl mode so that we know the topic updates are being propagated.
blocking_query_options = {
"debug_action": debug_action,
"sync_ddl": "true"
}
self.__run_loop_test(blocking_queries, blocking_query_options, 60000)
def __run_loop_test(self, blocking_queries, blocking_query_options, timeout):
"""Runs the given list of queries with given query options in a loop
and makes sure that they complete without any errors."""
slow_query_pool = ThreadPool(processes=len(blocking_queries))
# run the slow query on the impalad-0 with the given query options
slow_query_futures = {}
for q in blocking_queries:
print("Running blocking query {0}".format(q))
slow_query_futures[q] = slow_query_pool.apply_async(self.loop_exec,
args=(q, blocking_query_options))
for q in slow_query_futures:
# make sure that queries complete eventually.
durations = slow_query_futures[q].get()
for i in range(len(durations)):
assert durations[i] < timeout, "Query {0} iteration {1} did " \
"not complete within {2}.".format(q, i, timeout)
def loop_exec(self, query, query_options, iterations=3, impalad=0):
durations = []
for iter in range(iterations):
durations.append(self.exec_and_time(query, query_options, impalad))
return durations
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args="--topic_update_tbl_max_wait_time_ms=0")
def test_topic_lock_timeout_disabled(self):
"""Test makes sure that the topic update thread blocks until tables are
added to each topic update when topic_update_tbl_max_wait_time_ms is set to 0"""
# queries that we don't expect to block when a slow running blocking query is
# running in parallel. We want these queries to request the metadata from catalogd
# and hence the init queries invalidate the metadata before each test case run below.
non_blocking_queries = [
# each of these take about 2-4 seconds when there is no lock contention.
"describe functional.emptytable"
]
# queries used to reset the metadata of the non_blocking_queries so that they will
# reload the next time they are executed
init_queries = [
"invalidate metadata functional.emptytable",
]
# make sure that the blocking query metadata is loaded in catalogd since table lock
# is only acquired on loaded tables.
blocking_query = "refresh tpcds.store_sales"
debug_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@30"
self.client.execute(blocking_query)
blocking_query_options = {
"debug_action": debug_action,
"sync_ddl": "false"
}
self.__run_topic_update_test(blocking_query,
non_blocking_queries, init_queries, blocking_query_options=blocking_query_options,
expect_topic_updates_to_block=True)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args="--topic_update_tbl_max_wait_time_ms=120000 "
"--max_wait_time_for_sync_ddl_s=5")
def test_topic_sync_ddl_error(self, unique_database):
"""Test makes sure that if a sync ddl query on a unrelated table is executed with
timeout, it errors out if table is not published in topic updates within the timeout
value."""
sync_ddl_query = "create table {0}.{1} (c int)".format(unique_database, "test1")
sync_ddl_query_2 = "create table {0}.{1} (c int)".format(unique_database, "test2")
blocking_query = "refresh tpcds.store_sales"
debug_action = "catalogd_refresh_hdfs_listing_delay:SLEEP@30"
self.client.execute(blocking_query)
blocking_query_options = {
"debug_action": debug_action,
"sync_ddl": "false"
}
slow_query_pool = ThreadPool(processes=1)
# run the slow query on the impalad-0 with the given query options
slow_query_future = slow_query_pool.apply_async(self.exec_and_time,
args=(blocking_query, blocking_query_options, 0))
# wait until the slow query is executing and blocking the topic update thread
# to avoid any race conditions in the test
time.sleep(1)
# now run the sync ddl query; we should expect this sync ddl query to fail
# since the timeout value is too low and topic update thread doesn't get unblocked
# before the timeout.
self.execute_query_expect_failure(self.client, sync_ddl_query, {"sync_ddl": "true"})
# wait for the slow query to complete
slow_query_future.get()
# if query is not sync ddl it should not error out
slow_query_future = slow_query_pool.apply_async(self.exec_and_time,
args=(blocking_query, blocking_query_options, 0))
# wait until the slow query is executing and blocking the topic update thread
# to avoid any race conditions in the test
time.sleep(1)
self.execute_query_expect_success(self.client, sync_ddl_query_2,
{"sync_ddl": "false"})
slow_query_future.get()