| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from __future__ import absolute_import, division, print_function |
| from signal import SIGRTMIN |
| from time import sleep |
| |
| import pytest |
| |
| from tests.common.custom_cluster_test_suite import CustomClusterTestSuite |
| from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster |
| from tests.common.test_vector import HS2 |
| |
| |
| @SkipIf.is_buggy_el6_kernel |
| @SkipIfNotHdfsMinicluster.scheduling |
| class TestDataCache(CustomClusterTestSuite): |
| """ This test enables the data cache and verfies that cache hit and miss counts |
| in the runtime profile and metrics are as expected. Run on non-EC HDFS only as |
| this test checks the number of data cache hit counts, which implicitly relies |
| on the scheduler's behavior and number of HDFS blocks. |
| """ |
| @classmethod |
| def setup_class(cls): |
| if cls.exploration_strategy() != 'exhaustive': |
| pytest.skip('runs only in exhaustive') |
| super(TestDataCache, cls).setup_class() |
| |
| def get_impalad_args(eviction_policy, high_write_concurrency=True, |
| force_single_shard=True, keep_across_restarts=False): |
| impalad_args = ["--always_use_data_cache=true"] |
| if (high_write_concurrency): |
| impalad_args.append("--data_cache_write_concurrency=64") |
| if (force_single_shard): |
| impalad_args.append("--cache_force_single_shard") |
| if (keep_across_restarts): |
| impalad_args.append("--data_cache_keep_across_restarts=true") |
| impalad_args.append("--shutdown_grace_period_s=1") |
| impalad_args.append("--data_cache_eviction_policy={0}".format(eviction_policy)) |
| return " ".join(impalad_args) |
| |
| def get_data_cache_metric(self, suffix): |
| return self.get_metric('impala-server.io-mgr.remote-data-cache-' + suffix) |
| |
| CACHE_START_ARGS = "--data_cache_dir=/tmp --data_cache_size=500MB" |
| |
| def __test_data_cache_deterministic(self, vector, unique_database): |
| """ This test creates a temporary table from another table, overwrites it with |
| some other data and verifies that no stale data is read from the cache. Runs with |
| a single node to make it easier to verify the runtime profile. Also enables higher |
| write concurrency and uses a single shard to avoid non-determinism. |
| """ |
| self.run_test_case('QueryTest/data-cache', vector, unique_database) |
| assert self.get_data_cache_metric('dropped-bytes') >= 0 |
| assert self.get_data_cache_metric('dropped-entries') >= 0 |
| assert self.get_data_cache_metric('instant-evictions') >= 0 |
| assert self.get_data_cache_metric('hit-bytes') > 0 |
| assert self.get_data_cache_metric('hit-count') > 0 |
| assert self.get_data_cache_metric('miss-bytes') > 0 |
| assert self.get_data_cache_metric('miss-count') > 0 |
| assert self.get_data_cache_metric('total-bytes') > 0 |
| assert self.get_data_cache_metric('num-entries') > 0 |
| assert self.get_data_cache_metric('num-writes') > 0 |
| |
| # Expect all cache hits results in no opened files. |
| opened_file_handles_metric = 'impala-server.io.mgr.cached-file-handles-miss-count' |
| baseline = self.get_metric(opened_file_handles_metric) |
| self.execute_query("select count(distinct l_orderkey) from {0}.test_parquet".format( |
| unique_database)) |
| assert self.get_metric(opened_file_handles_metric) == baseline |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LRU"), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_deterministic_lru(self, vector, unique_database): |
| self.__test_data_cache_deterministic(vector, unique_database) |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LIRS"), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_deterministic_lirs(self, vector, unique_database): |
| self.__test_data_cache_deterministic(vector, unique_database) |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LRU") + " --max_cached_file_handles=0", |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_deterministic_no_file_handle_cache(self, vector, unique_database): |
| self.__test_data_cache_deterministic(vector, unique_database) |
| |
| def __test_data_cache(self): |
| """ This test scans the same table twice and verifies the cache hit count metrics |
| are correct. The exact number of bytes hit is non-deterministic between runs due |
| to different mtime of files and multiple shards in the cache. |
| """ |
| QUERY = "select * from tpch_parquet.lineitem" |
| # Do a first run to warm up the cache. Expect no hits. |
| self.execute_query(QUERY) |
| assert self.get_data_cache_metric('hit-bytes') == 0 |
| assert self.get_data_cache_metric('hit-count') == 0 |
| assert self.get_data_cache_metric('miss-bytes') > 0 |
| assert self.get_data_cache_metric('miss-count') > 0 |
| assert self.get_data_cache_metric('total-bytes') > 0 |
| assert self.get_data_cache_metric('num-entries') > 0 |
| assert self.get_data_cache_metric('num-writes') > 0 |
| |
| # Do a second run. Expect some hits. |
| self.execute_query(QUERY) |
| assert self.get_data_cache_metric('hit-bytes') > 0 |
| assert self.get_data_cache_metric('hit-count') > 0 |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LRU", high_write_concurrency=False, |
| force_single_shard=False), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_lru(self): |
| self.__test_data_cache() |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LIRS", high_write_concurrency=False, |
| force_single_shard=False), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_lirs(self): |
| self.__test_data_cache() |
| |
| def __test_data_cache_disablement(self, vector): |
| # Verifies that the cache metrics are all zero. |
| assert self.get_data_cache_metric('hit-bytes') == 0 |
| assert self.get_data_cache_metric('hit-count') == 0 |
| assert self.get_data_cache_metric('miss-bytes') == 0 |
| assert self.get_data_cache_metric('miss-count') == 0 |
| assert self.get_data_cache_metric('total-bytes') == 0 |
| assert self.get_data_cache_metric('num-entries') == 0 |
| assert self.get_data_cache_metric('num-writes') == 0 |
| |
| # Runs a query with the cache disabled and then enabled against multiple file formats. |
| # Verifies that the metrics stay at zero when the cache is disabled. |
| for disable_cache in [True, False]: |
| vector.get_value('exec_option')['disable_data_cache'] = int(disable_cache) |
| for file_format in ['text_gzip', 'parquet', 'avro', 'seq', 'rc']: |
| QUERY = "select * from functional_{0}.alltypes".format(file_format) |
| self.execute_query(QUERY, vector.get_value('exec_option')) |
| assert disable_cache == (self.get_data_cache_metric('miss-bytes') == 0) |
| assert disable_cache == (self.get_data_cache_metric('miss-count') == 0) |
| assert disable_cache == (self.get_data_cache_metric('total-bytes') == 0) |
| assert disable_cache == (self.get_data_cache_metric('num-entries') == 0) |
| assert disable_cache == (self.get_data_cache_metric('num-writes') == 0) |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LRU"), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_disablement_lru(self, vector): |
| self.__test_data_cache_disablement(vector) |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LIRS"), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_disablement_lirs(self, vector): |
| self.__test_data_cache_disablement(vector) |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LIRS", high_write_concurrency=False), |
| start_args="--data_cache_dir=/tmp --data_cache_size=9MB", |
| cluster_size=1) |
| def test_data_cache_lirs_instant_evictions(self): |
| # The setup for this test is intricate. For Allocate() to succeed, the request |
| # needs to be smaller than the protected size (95% of the cache). For Insert() to |
| # fail, the request needs to be larger than the unprotected size (5% of the cache). |
| # So, for an 8MB cache store to fail, the cache needs to be > 8.4MB (8MB / 0.95) |
| # and less than 160MB (8MB / 0.05). This sets it to 9MB, which should result in |
| # 8MB cache inserts to be instantly evicted. |
| QUERY = "select count(*) from tpch.lineitem" |
| self.execute_query(QUERY) |
| assert self.get_data_cache_metric('miss-bytes') > 0 |
| assert self.get_data_cache_metric('miss-count') > 0 |
| assert self.get_data_cache_metric('total-bytes') >= 0 |
| assert self.get_data_cache_metric('num-entries') >= 0 |
| assert self.get_data_cache_metric('num-writes') >= 0 |
| assert self.get_data_cache_metric('instant-evictions') > 0 |
| |
| # Run the query multiple times and verify that none of the counters go negative |
| instant_evictions_before = \ |
| self.get_data_cache_metric('instant-evictions') |
| for i in range(10): |
| self.execute_query(QUERY) |
| instant_evictions_after = \ |
| self.get_data_cache_metric('instant-evictions') |
| assert instant_evictions_after - instant_evictions_before > 0 |
| |
| # All the counters remain positive |
| assert self.get_data_cache_metric('num-entries') >= 0 |
| assert self.get_data_cache_metric('num-writes') >= 0 |
| assert self.get_data_cache_metric('total-bytes') >= 0 |
| |
| def __test_data_cache_keep_across_restarts(self, test_reduce_size=False): |
| QUERY = "select * from tpch_parquet.lineitem" |
| # Execute a query, record the total bytes and the number of entries of cache before |
| # cache dump. |
| with self.create_impala_client(protocol=HS2) as client1: |
| client1.execute(QUERY) |
| assert self.get_data_cache_metric('hit-bytes') == 0 |
| assert self.get_data_cache_metric('hit-count') == 0 |
| total_bytes = self.get_data_cache_metric('total-bytes') |
| num_entries = self.get_data_cache_metric('num-entries') |
| |
| # Do graceful restart and, if necessary, reduce the cache size by 1/5. |
| impalad = self.cluster.impalads[0] |
| impalad.kill_and_wait_for_exit(SIGRTMIN) |
| new_size = 4 * total_bytes // 5 |
| if test_reduce_size: |
| impalad.modify_argument('-data_cache', '/tmp/impala-datacache-0:' + str(new_size)) |
| impalad.start() |
| impalad.service.wait_for_num_known_live_backends(1) |
| |
| # After the restart, we expect the cache to have the same total bytes |
| # and number of entries as before the restart, and if the cache size is reduced, |
| # the metrics should be reduced accordingly. |
| if test_reduce_size: |
| assert self.get_data_cache_metric('total-bytes') <= new_size |
| assert self.get_data_cache_metric('num-entries') < num_entries |
| else: |
| assert self.get_data_cache_metric('total-bytes') == total_bytes |
| assert self.get_data_cache_metric('num-entries') == num_entries |
| |
| # Reconnect to the service and execute the query, expecting some cache hits. |
| with self.create_impala_client(protocol=HS2) as client2: |
| client2.execute(QUERY) |
| assert self.get_data_cache_metric('hit-bytes') > 0 |
| assert self.get_data_cache_metric('hit-count') > 0 |
| if test_reduce_size: |
| assert self.get_data_cache_metric('miss-bytes') > 0 |
| assert self.get_data_cache_metric('miss-count') > 0 |
| else: |
| assert self.get_data_cache_metric('miss-bytes') == 0 |
| assert self.get_data_cache_metric('miss-count') == 0 |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LRU", keep_across_restarts=True), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_keep_across_restarts_lru(self): |
| self.__test_data_cache_keep_across_restarts() |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LIRS", keep_across_restarts=True), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_keep_across_restarts_lirs(self): |
| self.__test_data_cache_keep_across_restarts() |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LRU", keep_across_restarts=True), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_reduce_size_restarts_lru(self): |
| self.__test_data_cache_keep_across_restarts(test_reduce_size=True) |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LIRS", keep_across_restarts=True), |
| start_args=CACHE_START_ARGS, cluster_size=1) |
| def test_data_cache_reduce_size_restarts_lirs(self): |
| self.__test_data_cache_keep_across_restarts(test_reduce_size=True) |
| |
| def __test_data_cache_readonly(self): |
| QUERY = "select * from tpch_parquet.lineitem" |
| # Execute the query asynchronously, wait a short while, and do gracefully shutdown |
| # immediately to test the race between cache writes and setting cache read-only. |
| with self.create_impala_client(protocol=HS2) as client1: |
| handle = client1.execute_async(QUERY) |
| sleep(1) |
| impalad = self.cluster.impalads[0] |
| impalad.kill(SIGRTMIN) |
| client1.fetch(QUERY, handle) |
| client1.close_query(handle) |
| impalad.wait_for_exit() |
| impalad.start() |
| impalad.service.wait_for_num_known_live_backends(1) |
| |
| # We hope that in this case, the cache is still properly dumped and loaded, |
| # and then the same query is executed to expect some cache hits. |
| self.assert_impalad_log_contains('INFO', 'Partition 0 load successfully.') |
| with self.create_impala_client(protocol=HS2) as client2: |
| client2.execute(QUERY) |
| assert self.get_data_cache_metric('hit-bytes') > 0 |
| assert self.get_data_cache_metric('hit-count') > 0 |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LRU", keep_across_restarts=True), |
| start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True) |
| def test_data_cache_readonly_lru(self): |
| self.__test_data_cache_readonly() |
| |
| @pytest.mark.execute_serially |
| @CustomClusterTestSuite.with_args( |
| impalad_args=get_impalad_args("LIRS", keep_across_restarts=True), |
| start_args=CACHE_START_ARGS, cluster_size=1, disable_log_buffering=True) |
| def test_data_cache_readonly_lirs(self): |
| self.__test_data_cache_readonly() |