blob: cde233560d20116e61dab84fe9c7f19e54e4070a [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import os
import pytest
import time
from subprocess import call
from tests.common.environ import ImpalaTestClusterProperties
from tests.common.skip import SkipIfFS
from tests.util.filesystem_utils import IS_HDFS, IS_LOCAL
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.util.event_processor_utils import EventProcessorUtils
class TestAutomaticCatalogInvalidation(CustomClusterTestSuite):
""" Test that tables are cached in the catalogd after usage for the configured time
and invalidated afterwards."""
query = "select count(*) from functional.alltypes"
# The following columns string presents in the catalog object iff the table loaded.
metadata_cache_string = "columns (list) = list<struct>"
url = "http://localhost:25020/catalog_object?object_type=TABLE&" \
"object_name=functional.alltypes"
# The test will run a query and assumes the table is loaded when the query finishes.
# The timeout should be larger than the time of the query.
timeout = 20 if ImpalaTestClusterProperties.get_instance().runs_slowly() or\
(not IS_HDFS and not IS_LOCAL) else 10
timeout_flag = "--invalidate_tables_timeout_s=" + str(timeout)
def _get_catalog_object(self):
""" Return the catalog object of functional.alltypes serialized to string. """
return self.cluster.catalogd.service.read_debug_webpage(
"catalog_object?object_type=TABLE&object_name=functional.alltypes")
def _run_test(self):
self.client.execute(self.query)
# The table is cached after usage.
assert self.metadata_cache_string in self._get_catalog_object()
# Wait 5 * table TTL for the invalidation to take effect.
max_wait_time = time.time() + self.timeout * 5
while True:
time.sleep(1)
# The table is eventually evicted.
if self.metadata_cache_string not in self._get_catalog_object():
return
assert time.time() < max_wait_time
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(catalogd_args=timeout_flag, impalad_args=timeout_flag)
def test_v1_catalog(self):
self._run_test()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args=timeout_flag + " --catalog_topic_mode=minimal",
impalad_args=timeout_flag + " --use_local_catalog")
def test_local_catalog(self):
self._run_test()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(catalogd_args="--invalidate_tables_timeout_s=1",
impalad_args="--invalidate_tables_timeout_s=1")
def test_invalid_table(self):
""" Regression test for IMPALA-7606. Tables failed to be loaded don't have a
last used time and shouldn't be considered for invalidation."""
self.execute_query_expect_failure(self.client, "select * from functional.bad_serde")
# The table expires after 1 second. Sleeping for another logbufsecs=5 seconds to wait
# for the log to be flushed. Wait 4 more seconds to reduce flakiness.
time.sleep(10)
assert "Unexpected exception thrown while attempting to automatically invalidate "\
"tables" not in open(os.path.join(self.impala_log_dir, "catalogd.INFO")).read()
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(
catalogd_args="--invalidate_tables_on_memory_pressure "
"--invalidate_tables_gc_old_gen_full_threshold=0 "
"--invalidate_tables_fraction_on_memory_pressure=1",
impalad_args="--invalidate_tables_on_memory_pressure")
def test_memory_pressure(self):
""" Test that memory-based invalidation kicks out all the tables after an GC."""
self.execute_query(self.query)
# This triggers a full GC as of openjdk 1.8.
call(["jmap", "-histo:live", str(self.cluster.catalogd.get_pid())])
# Sleep for logbufsecs=5 seconds to wait for the log to be flushed. Wait 5 more
# seconds to reduce flakiness.
time.sleep(10)
assert self.metadata_cache_string not in self._get_catalog_object()
@SkipIfFS.hive
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(catalogd_args=timeout_flag, impalad_args=timeout_flag)
def test_loaded_tables_metric(self, unique_database):
"""Test IMPALA-13863: catalog.num-loaded-tables metric tracks loaded tables
correctly across various metadata operations including loading, invalidation,
refresh, rename, and removal."""
metric_name = "catalog.num-loaded-tables"
catalogd = self.cluster.catalogd.service
# Test 1: Loading increases counter
self.execute_query("invalidate metadata")
catalogd.wait_for_metric_value(metric_name, 0)
self.execute_query(self.query)
catalogd.wait_for_metric_value(metric_name, 1)
# Test 2: Single table INVALIDATE METADATA decreases counter
self.execute_query("invalidate metadata functional.alltypes")
catalogd.wait_for_metric_value(metric_name, 0)
# Test 3: REFRESH loaded table (counter should stay same)
self.execute_query(self.query)
catalogd.wait_for_metric_value(metric_name, 1)
count_before_refresh = catalogd.get_metric_value(metric_name)
self.execute_query("refresh functional.alltypes")
# Wait for one metrics refresh cycle (REFRESH_METRICS_INTERVAL_MS)
# to ensure the metric is updated
time.sleep(1)
count_after_refresh = catalogd.get_metric_value(metric_name)
assert count_after_refresh == count_before_refresh, (
"Count should stay same after REFRESH of loaded table (was %d, now %d)"
% (count_before_refresh, count_after_refresh))
# Test 4: ALTER TABLE RENAME (counter decreases because old loaded table is
# removed and new table starts as IncompleteTable)
self.execute_query("create table %s.test_rename_tbl (id int)" % unique_database)
self.execute_query("select * from %s.test_rename_tbl" % unique_database)
catalogd.wait_for_metric_value(metric_name, 2)
self.execute_query("alter table %s.test_rename_tbl rename \
to %s.test_renamed_tbl" % (unique_database, unique_database))
catalogd.wait_for_metric_value(metric_name, 1)
# Verify that accessing the renamed table increments the counter
self.execute_query("select * from %s.test_renamed_tbl" % unique_database)
catalogd.wait_for_metric_value(metric_name, 2)
# Test 5: Load another table, then global INVALIDATE METADATA
self.execute_query("select count(*) from functional.alltypessmall")
catalogd.wait_for_metric_value(metric_name, 3)
self.execute_query("invalidate metadata")
catalogd.wait_for_metric_value(metric_name, 0)
# Test 6: CREATE TABLE, load it, then DROP TABLE
self.execute_query("create table %s.test_metric_tbl (id int)" % unique_database)
# Wait for one metrics refresh cycle (REFRESH_METRICS_INTERVAL_MS)
# to ensure the metric is updated
time.sleep(1)
count_after_create = catalogd.get_metric_value(metric_name)
assert count_after_create == 0, (
"Count should be 0 after creating table (got %d)" % count_after_create)
self.execute_query("select * from %s.test_metric_tbl" % unique_database)
catalogd.wait_for_metric_value(metric_name, 1)
self.execute_query("drop table %s.test_metric_tbl" % unique_database)
catalogd.wait_for_metric_value(metric_name, 0)
# Test 7: Hive-side DROP TABLE processed via events
self.execute_query("create table %s.hive_drop_tbl (id int, val string)"
% unique_database)
self.execute_query("select * from %s.hive_drop_tbl" % unique_database)
catalogd.wait_for_metric_value(metric_name, 1)
# Drop table from Hive side
self.run_stmt_in_hive("drop table %s.hive_drop_tbl" % unique_database)
EventProcessorUtils.wait_for_event_processing(self)
catalogd.wait_for_metric_value(metric_name, 0)
# Test 8: DROP DATABASE CASCADE with loaded table
test_db = ImpalaTestSuite.get_random_name("test_db_")
try:
self.execute_query("create database if not exists %s" % test_db)
self.execute_query("create table %s.t1 (id int, name string)" % test_db)
self.execute_query("insert into %s.t1 values (1, 'test')" % test_db)
self.execute_query("select * from %s.t1" % test_db)
catalogd.wait_for_metric_value(metric_name, 1)
finally:
self.execute_query("drop database %s cascade" % test_db)
catalogd.wait_for_metric_value(metric_name, 0)
# Test 9: Automatic timeout-based invalidation
self.execute_query(self.query)
catalogd.wait_for_metric_value(metric_name, 1)
assert self.metadata_cache_string in self._get_catalog_object()
# Wait for automatic timeout-based invalidation to complete and metric to update.
# ImpaladTableUsageTracker reports table usage with a delay of up to 15 seconds
# (1.5 * REPORT_INTERVAL_MS where REPORT_INTERVAL_MS=10s), then the invalidation
# TTL kicks in. Add ~1s extra for metric updates and ~1s buffer for RPC, serde, etc.
# Max time = 15s (max report delay) + self.timeout (TTL) + 2s
catalogd.wait_for_metric_value(metric_name, 0, timeout=self.timeout + 17)
# Verify that the table metadata was actually invalidated
assert self.metadata_cache_string not in self._get_catalog_object(), \
"Table metadata should be invalidated after timeout"