blob: bdcfb7bc8c0ab1d79c67226d41dfb3ea74663fc1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import pytest
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfLocal, SkipIfEC
from tests.util.filesystem_utils import (
from time import sleep
class TestHdfsFdCaching(CustomClusterTestSuite):
"""Tests that if HDFS file handle caching is enabled, file handles are actually cached
and the associated metrics return valid results. In addition, tests that the upper bound
of cached file handles is respected."""
NUM_ROWS = 100
INSERT_TPL = "insert into cachefd.simple values"
def get_workload(self):
return 'functional-query'
def create_n_files(self, n):
"""Creates 'n' files by performing 'n' inserts with NUM_ROWS rows."""
values = ", ".join(["({0},{0},{0})".format(x) for x in range(self.NUM_ROWS)])
for _ in range(n):
self.client.execute(self.INSERT_TPL + values)
def setup_method(self, method):
super(TestHdfsFdCaching, self).setup_method(method)
impalad = self.cluster.impalads[0]
client = impalad.service.create_beeswax_client()
self.client = client
client.execute("drop database if exists cachefd cascade")
client.execute("create database cachefd")
client.execute("create table cachefd.simple(id int, col1 int, col2 int) "
"stored as parquet")
def teardown_method(self, method):
super(TestHdfsFdCaching, self).teardown_method(method)
self.client.execute("drop database if exists cachefd cascade")
def run_fd_caching_test(self, vector, caching_expected, cache_capacity,
Tests that HDFS file handles are cached as expected. This is used both
for the positive and negative test cases. If caching_expected is true,
this verifies that the cache adheres to the specified capacity. Also,
repeated queries across the same files reuse the file handles.
If caching_expected is false, it verifies that the cache does not
change in size while running queries.
# Maximum number of file handles cached (applies whether caching expected
# or not)
assert self.max_cached_handles() <= cache_capacity
num_handles_start = self.cached_handles()
# The table has one file. If caching is expected, there should be one more
# handle cached after the first select. If caching is not expected, the
# number of handles should not change from the initial number.
self.execute_query("select * from cachefd.simple", vector=vector)
num_handles_after = self.cached_handles()
assert self.max_cached_handles() <= cache_capacity
if caching_expected:
assert num_handles_after == (num_handles_start + 1)
assert num_handles_after == num_handles_start
# No open handles if scanning is finished
assert self.outstanding_handles() == 0
# No change when reading the table again
for x in range(10):
self.execute_query("select * from cachefd.simple", vector=vector)
assert self.cached_handles() == num_handles_after
assert self.max_cached_handles() <= cache_capacity
assert self.outstanding_handles() == 0
# Create more files. This means there are more files than the cache size.
# The cache size should still be enforced.
self.create_n_files(cache_capacity + 100)
# Read all the files of the table and make sure no FD leak
for x in range(10):
self.execute_query("select count(*) from cachefd.simple;", vector=vector)
assert self.max_cached_handles() <= cache_capacity
if not caching_expected:
assert self.cached_handles() == num_handles_start
assert self.outstanding_handles() == 0
if caching_expected and eviction_timeout_secs is not None:
# To test unused file handle eviction, sleep for longer than the timeout.
# All the cached handles should be evicted.
sleep(eviction_timeout_secs + 5)
assert self.cached_handles() == 0
impalad_args="--max_cached_file_handles=16 " +
" --unused_file_handle_timeout_sec=18446744073709551600",
def test_caching_enabled(self, vector):
Test of the HDFS file handle cache with the parameter specified and a very
large file handle timeout
cache_capacity = 16
# Caching applies to HDFS and S3 files. If this is HDFS or S3, then verify
# that caching works. Otherwise, verify that file handles are not cached.
caching_expected = False
caching_expected = True
self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
impalad_args="--max_cached_file_handles=16 --unused_file_handle_timeout_sec=5",
def test_caching_with_eviction(self, vector):
"""Test of the HDFS file handle cache with unused file handle eviction enabled"""
cache_capacity = 16
handle_timeout = 5
# Only test eviction on platforms where caching is enabled.
caching_expected = True
self.run_fd_caching_test(vector, caching_expected, cache_capacity, handle_timeout)
def test_caching_disabled_by_param(self, vector):
"""Test that the HDFS file handle cache is disabled when the parameter is zero"""
cache_capacity = 0
caching_expected = False
self.run_fd_caching_test(vector, caching_expected, cache_capacity, None)
def cached_handles(self):
return self.get_agg_metric("")
def outstanding_handles(self):
return self.get_agg_metric("")
def max_cached_handles(self):
return self.get_agg_metric("", max)
def get_agg_metric(self, key, fun=sum):
cluster = self.cluster
return fun([s.service.get_metric_value(key) for s in cluster.impalads])