blob: 881496365f2595f75f3a7111a94bd712af69ae94 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIf, SkipIfNotHdfsMinicluster
class TestDataCache(CustomClusterTestSuite):
""" This test enables the data cache and verfies that cache hit and miss counts
in the runtime profile and metrics are as expected. Run on non-EC HDFS only as
this test checks the number of data cache hit counts, which implicitly relies
on the scheduler's behavior and number of HDFS blocks.
def get_workload(self):
return 'functional-query'
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestDataCache, cls).setup_class()
impalad_args="--always_use_data_cache=true --data_cache_write_concurrency=64"
" --cache_force_single_shard=true",
start_args="--data_cache_dir=/tmp --data_cache_size=500MB", cluster_size=1)
def test_data_cache_deterministic(self, vector, unique_database):
""" This test creates a temporary table from another table, overwrites it with
some other data and verifies that no stale data is read from the cache. Runs with
a single node to make it easier to verify the runtime profile. Also enables higher
write concurrency and uses a single shard to avoid non-determinism.
self.run_test_case('QueryTest/data-cache', vector, unique_database)
assert self.get_metric('') >= 0
assert self.get_metric('') > 0
assert self.get_metric('') > 0
assert self.get_metric('') > 0
start_args="--data_cache_dir=/tmp --data_cache_size=500MB", cluster_size=1)
def test_data_cache(self, vector):
""" This test scans the same table twice and verifies the cache hit count metrics
are correct. The exact number of bytes hit is non-deterministic between runs due
to different mtime of files and multiple shards in the cache.
QUERY = "select * from tpch_parquet.lineitem"
# Do a first run to warm up the cache. Expect no hits.
assert self.get_metric('') == 0
assert self.get_metric('') > 0
assert self.get_metric('') > 0
# Do a second run. Expect some hits.
assert self.get_metric('') > 0
impalad_args="--always_use_data_cache=true --data_cache_write_concurrency=64"
" --cache_force_single_shard=true",
start_args="--data_cache_dir=/tmp --data_cache_size=500MB", cluster_size=1)
def test_data_cache_disablement(self, vector):
# Verifies that the cache metrics are all zero.
assert self.get_metric('') == 0
assert self.get_metric('') == 0
assert self.get_metric('') == 0
# Runs a query with the cache disabled and then enabled against multiple file formats.
# Verifies that the metrics stay at zero when the cache is disabled.
for disable_cache in [True, False]:
vector.get_value('exec_option')['disable_data_cache'] = int(disable_cache)
for file_format in ['text_gzip', 'parquet', 'avro', 'seq', 'rc']:
QUERY = "select * from functional_{0}.alltypes".format(file_format)
self.execute_query(QUERY, vector.get_value('exec_option'))
assert disable_cache ==\
(self.get_metric('') == 0)
assert disable_cache ==\
(self.get_metric('') == 0)