blob: 107b33a531af7abda690546efbf4078b812cf393 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Validates limit on scan nodes
import pytest
import re
import time
from subprocess import check_call
from tests.common.environ import build_flavor_timeout, IS_DOCKERIZED_TEST_CLUSTER
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
from tests.common.skip import (SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfIsilon,
SkipIfLocal, SkipIfEC, SkipIfDockerizedCluster, SkipIfCatalogV2)
from tests.common.test_dimensions import create_single_exec_option_dimension
from tests.util.filesystem_utils import get_fs_path
from tests.util.shell_util import exec_process
# End to end test that hdfs caching is working.
@SkipIfS3.caching # S3: missing coverage: verify SET CACHED gives error
@SkipIfABFS.caching
@SkipIfADLS.caching
@SkipIfIsilon.caching
@SkipIfLocal.caching
@SkipIfEC.fix_later
class TestHdfsCaching(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(TestHdfsCaching, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('exec_option')['batch_size'] == 0)
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format == "text")
# The tpch nation table is cached as part of data loading. We'll issue a query
# against this table and verify the metric is updated correctly.
@pytest.mark.execute_serially
def test_table_is_cached(self, vector):
cached_read_metric = "impala-server.io-mgr.cached-bytes-read"
query_string = "select count(*) from tpch.nation"
expected_bytes_delta = 2199
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
# Collect the cached read metric on all the impalads before running the query
cached_bytes_before = list()
for impalad in impala_cluster.impalads:
cached_bytes_before.append(impalad.service.get_metric_value(cached_read_metric))
# Execute the query.
result = self.execute_query(query_string)
assert(len(result.data) == 1)
assert(result.data[0] == '25')
# Read the metrics again.
cached_bytes_after = list()
for impalad in impala_cluster.impalads:
cached_bytes_after.append(impalad.service.get_metric_value(cached_read_metric))
# Verify that the cached bytes increased by the expected number on exactly one of
# the impalads.
num_metrics_increased = 0
assert(len(cached_bytes_before) == len(cached_bytes_after))
for i in range(0, len(cached_bytes_before)):
assert(cached_bytes_before[i] == cached_bytes_after[i] or\
cached_bytes_before[i] + expected_bytes_delta == cached_bytes_after[i])
if cached_bytes_after[i] > cached_bytes_before[i]:
num_metrics_increased = num_metrics_increased + 1
if IS_DOCKERIZED_TEST_CLUSTER:
assert num_metrics_increased == 0, "HDFS caching is disabled in dockerised cluster."
elif num_metrics_increased != 1:
# Test failed, print the metrics
for i in range(0, len(cached_bytes_before)):
print "%d %d" % (cached_bytes_before[i], cached_bytes_after[i])
assert(False)
def test_cache_cancellation(self, vector):
""" This query runs on some mix of cached and not cached tables. The query has
a limit so it exercises the cancellation paths. Regression test for
IMPALA-1019. """
num_iters = 100
query_string = """
with t1 as (select int_col x, bigint_col y from functional.alltypes limit 2),
t2 as (select int_col x, bigint_col y from functional.alltypestiny limit 2),
t3 as (select int_col x, bigint_col y from functional.alltypessmall limit 2)
select * from t1, t2, t3 where t1.x = t2.x and t2.x = t3.x """
# Run this query for some iterations since it is timing dependent.
for x in xrange(1, num_iters):
result = self.execute_query(query_string)
assert(len(result.data) == 2)
# A separate class has been created for "test_hdfs_caching_fallback_path" to make it
# run as a part of exhaustive tests which require the workload to be 'functional-query'.
# TODO: Move this to TestHdfsCaching once we make exhaustive tests run for other workloads
@SkipIfS3.caching
@SkipIfABFS.caching
@SkipIfADLS.caching
@SkipIfIsilon.caching
@SkipIfLocal.caching
class TestHdfsCachingFallbackPath(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@SkipIfS3.hdfs_encryption
@SkipIfABFS.hdfs_encryption
@SkipIfADLS.hdfs_encryption
@SkipIfIsilon.hdfs_encryption
@SkipIfLocal.hdfs_encryption
def test_hdfs_caching_fallback_path(self, vector, unique_database, testid_checksum):
""" This tests the code path of the query execution where the hdfs cache read fails
and the execution falls back to the normal read path. To reproduce this situation we
rely on IMPALA-3679, where zcrs are not supported with encryption zones. This makes
sure ReadFromCache() fails and falls back to ReadRange() to read the scan range."""
if self.exploration_strategy() != 'exhaustive' or\
vector.get_value('table_format').file_format != 'text':
pytest.skip()
# Create a new encryption zone and copy the tpch.nation table data into it.
encrypted_table_dir = get_fs_path("/test-warehouse/" + testid_checksum)
create_query_sql = "CREATE EXTERNAL TABLE %s.cached_nation like tpch.nation "\
"LOCATION '%s'" % (unique_database, encrypted_table_dir)
check_call(["hdfs", "dfs", "-mkdir", encrypted_table_dir], shell=False)
check_call(["hdfs", "crypto", "-createZone", "-keyName", "testKey1", "-path",\
encrypted_table_dir], shell=False)
check_call(["hdfs", "dfs", "-cp", get_fs_path("/test-warehouse/tpch.nation/*.tbl"),\
encrypted_table_dir], shell=False)
# Reduce the scan range size to force the query to have multiple scan ranges.
exec_options = vector.get_value('exec_option')
exec_options['max_scan_range_length'] = 1024
try:
self.execute_query_expect_success(self.client, create_query_sql)
# Cache the table data
self.execute_query_expect_success(self.client, "ALTER TABLE %s.cached_nation set "
"cached in 'testPool'" % unique_database)
# Wait till the whole path is cached. We set a deadline of 20 seconds for the path
# to be cached to make sure this doesn't loop forever in case of caching errors.
caching_deadline = time.time() + 20
while not is_path_fully_cached(encrypted_table_dir):
if time.time() > caching_deadline:
pytest.fail("Timed out caching path: " + encrypted_table_dir)
time.sleep(2)
self.execute_query_expect_success(self.client, "invalidate metadata "
"%s.cached_nation" % unique_database);
result = self.execute_query_expect_success(self.client, "select count(*) from "
"%s.cached_nation" % unique_database, exec_options)
assert(len(result.data) == 1)
assert(result.data[0] == '25')
except Exception as e:
pytest.fail("Failure in test_hdfs_caching_fallback_path: " + str(e))
finally:
check_call(["hdfs", "dfs", "-rm", "-r", "-f", "-skipTrash", encrypted_table_dir],\
shell=False)
@SkipIfS3.caching
@SkipIfABFS.caching
@SkipIfADLS.caching
@SkipIfIsilon.caching
@SkipIfLocal.caching
@SkipIfCatalogV2.hdfs_caching_ddl_unsupported()
class TestHdfsCachingDdl(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestHdfsCachingDdl, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format == 'text' and \
v.get_value('table_format').compression_codec == 'none')
def setup_method(self, method):
self.cleanup_db("cachedb")
self.client.execute("create database cachedb")
def teardown_method(self, method):
self.cleanup_db("cachedb")
@pytest.mark.execute_serially
@SkipIfDockerizedCluster.accesses_host_filesystem
def test_caching_ddl(self, vector):
# Get the number of cache requests before starting the test
num_entries_pre = get_num_cache_requests()
self.run_test_case('QueryTest/hdfs-caching', vector)
# After running this test case we should be left with 10 cache requests.
# In this case, 1 for each table + 7 more for each cached partition + 1
# for the table with partitions on both HDFS and local file system.
assert num_entries_pre == get_num_cache_requests() - 10
self.client.execute("drop table cachedb.cached_tbl_part")
self.client.execute("drop table cachedb.cached_tbl_nopart")
self.client.execute("drop table cachedb.cached_tbl_local")
self.client.execute("drop table cachedb.cached_tbl_ttl")
# Dropping the tables should cleanup cache entries leaving us with the same
# total number of entries.
assert num_entries_pre == get_num_cache_requests()
@pytest.mark.execute_serially
def test_caching_ddl_drop_database(self, vector):
"""IMPALA-2518: DROP DATABASE CASCADE should properly drop all impacted cache
directives"""
num_entries_pre = get_num_cache_requests()
# Populates the `cachedb` database with some cached tables and partitions
self.client.execute("use cachedb")
self.client.execute("create table cached_tbl_nopart (i int) cached in 'testPool'")
self.client.execute("insert into cached_tbl_nopart select 1")
self.client.execute("create table cached_tbl_part (i int) partitioned by (j int) \
cached in 'testPool'")
self.client.execute("insert into cached_tbl_part (i,j) select 1, 2")
# We expect the number of cached entities to grow
assert num_entries_pre < get_num_cache_requests()
self.client.execute("use default")
self.client.execute("drop database cachedb cascade")
# We want to see the number of cached entities return to the original count
assert num_entries_pre == get_num_cache_requests()
@pytest.mark.execute_serially
def test_cache_reload_validation(self, vector):
"""This is a set of tests asserting that cache directives modified
outside of Impala are picked up after reload, cf IMPALA-1645"""
num_entries_pre = get_num_cache_requests()
create_table = ("create table cachedb.cached_tbl_reload "
"(id int) cached in 'testPool' with replication = 8")
self.client.execute(create_table)
# Access the table once to load the metadata
self.client.execute("select count(*) from cachedb.cached_tbl_reload")
create_table = ("create table cachedb.cached_tbl_reload_part (i int) "
"partitioned by (j int) cached in 'testPool' with replication = 8")
self.client.execute(create_table)
# Add two partitions
self.client.execute("alter table cachedb.cached_tbl_reload_part add partition (j=1)")
self.client.execute("alter table cachedb.cached_tbl_reload_part add partition (j=2)")
assert num_entries_pre + 4 == get_num_cache_requests(), \
"Adding the tables should be reflected by the number of cache directives."
# Modify the cache directive outside of Impala and reload the table to verify
# that changes are visible
drop_cache_directives_for_path("/test-warehouse/cachedb.db/cached_tbl_reload")
drop_cache_directives_for_path("/test-warehouse/cachedb.db/cached_tbl_reload_part")
drop_cache_directives_for_path(
"/test-warehouse/cachedb.db/cached_tbl_reload_part/j=1")
change_cache_directive_repl_for_path(
"/test-warehouse/cachedb.db/cached_tbl_reload_part/j=2", 3)
# Create a bogus cached table abusing an existing cache directive ID, IMPALA-1750
dirid = get_cache_directive_for_path("/test-warehouse/cachedb.db/cached_tbl_reload_part/j=2")
self.client.execute(("create table cachedb.no_replication_factor (id int) " \
"tblproperties(\"cache_directive_id\"=\"%s\")" % dirid))
self.run_test_case('QueryTest/hdfs-caching-validation', vector)
# Temp fix for IMPALA-2510. Due to IMPALA-2518, when the test database is dropped,
# the cache directives are not removed for table 'cached_tbl_reload_part'.
drop_cache_directives_for_path(
"/test-warehouse/cachedb.db/cached_tbl_reload_part/j=2")
@pytest.mark.execute_serially
def test_external_drop(self):
"""IMPALA-3040: Tests that dropping a partition in Hive leads to the removal of the
cache directive after a refresh statement in Impala."""
num_entries_pre = get_num_cache_requests()
self.client.execute("use cachedb")
self.client.execute("create table test_external_drop_tbl (i int) partitioned by "
"(j int) cached in 'testPool'")
self.client.execute("insert into test_external_drop_tbl (i,j) select 1, 2")
# 1 directive for the table and 1 directive for the partition.
assert num_entries_pre + 2 == get_num_cache_requests()
self.hive_client.drop_partition("cachedb", "test_external_drop_tbl", ["2"], True)
self.client.execute("refresh test_external_drop_tbl")
# The directive on the partition is removed.
assert num_entries_pre + 1 == get_num_cache_requests()
self.client.execute("drop table test_external_drop_tbl")
# We want to see the number of cached entities return to the original count.
assert num_entries_pre == get_num_cache_requests()
def drop_cache_directives_for_path(path):
"""Drop the cache directive for a given path"""
rc, stdout, stderr = exec_process("hdfs cacheadmin -removeDirectives -path %s" % path)
assert rc == 0, \
"Error removing cache directive for path %s (%s, %s)" % (path, stdout, stderr)
def is_path_fully_cached(path):
"""Returns true if all the bytes of the path are cached, false otherwise"""
rc, stdout, stderr = exec_process("hdfs cacheadmin -listDirectives -stats -path %s" % path)
assert rc == 0
caching_stats = stdout.strip("\n").split("\n")[-1].split()
# Compare BYTES_NEEDED and BYTES_CACHED, the output format is as follows
# "ID POOL REPL EXPIRY PATH BYTES_NEEDED BYTES_CACHED FILES_NEEDED FILES_CACHED"
return len(caching_stats) > 0 and caching_stats[5] == caching_stats[6]
def get_cache_directive_for_path(path):
rc, stdout, stderr = exec_process("hdfs cacheadmin -listDirectives -path %s" % path)
assert rc == 0
dirid = re.search('^\s+?(\d+)\s+?testPool\s+?.*?$', stdout, re.MULTILINE).group(1)
return dirid
def change_cache_directive_repl_for_path(path, repl):
"""Drop the cache directive for a given path"""
dirid = get_cache_directive_for_path(path)
rc, stdout, stderr = exec_process(
"hdfs cacheadmin -modifyDirective -id %s -replication %s" % (dirid, repl))
assert rc == 0, \
"Error modifying cache directive for path %s (%s, %s)" % (path, stdout, stderr)
def get_num_cache_requests():
"""Returns the number of outstanding cache requests. Due to race conditions in the
way cache requests are added/dropped/reported (see IMPALA-3040), this function tries
to return a stable result by making several attempts to stabilize it within a
reasonable timeout."""
def get_num_cache_requests_util():
rc, stdout, stderr = exec_process("hdfs cacheadmin -listDirectives -stats")
assert rc == 0, 'Error executing hdfs cacheadmin: %s %s' % (stdout, stderr)
return len(stdout.split('\n'))
# IMPALA-3040: This can take time, especially under slow builds like ASAN.
wait_time_in_sec = build_flavor_timeout(5, slow_build_timeout=20)
num_stabilization_attempts = 0
max_num_stabilization_attempts = 10
new_requests = None
num_requests = None
LOG.info("{0} Entered get_num_cache_requests()".format(time.time()))
while num_stabilization_attempts < max_num_stabilization_attempts:
new_requests = get_num_cache_requests_util()
if new_requests == num_requests: break
LOG.info("{0} Waiting to stabilise: num_requests={1} new_requests={2}".format(
time.time(), num_requests, new_requests))
num_requests = new_requests
num_stabilization_attempts = num_stabilization_attempts + 1
time.sleep(wait_time_in_sec)
LOG.info("{0} Final num requests: {1}".format(time.time(), num_requests))
return num_requests