blob: 54690c605c77b985ec976d72c0d463fb75228edb [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import os
import random
import re
import string
import time
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.skip import SkipIfDockerizedCluster, SkipIf
from tests.common.test_dimensions import (
add_exec_option_dimension, add_mandatory_exec_option)
from tests.util.parse_util import (
match_memory_estimate, parse_mem_to_mb, match_cache_key)
TABLE_LAYOUT = 'name STRING, age INT, address STRING'
CACHE_START_ARGS = \
"--tuple_cache_dir=/tmp --tuple_cache_debug_dump_dir=/tmp --log_level=2"
NUM_HITS = 'NumTupleCacheHits'
NUM_HALTED = 'NumTupleCacheHalted'
NUM_SKIPPED = 'NumTupleCacheSkipped'
NUM_CORRECTNESS_VERIFICATION = 'NumTupleCacheCorrectnessVerification'
# Indenation used for TUPLE_CACHE_NODE in specific fragments (not averaged fragment).
NODE_INDENT = ' - '
# Generates a random table entry of at least 15 bytes.
def table_value(seed):
r = random.Random(seed)
name = "".join([r.choice(string.ascii_letters) for _ in range(r.randint(5, 20))])
age = r.randint(1, 90)
address = "{0} {1}".format(r.randint(1, 9999),
"".join([r.choice(string.ascii_letters) for _ in range(r.randint(4, 12))]))
return '"{0}", {1}, "{2}"'.format(name, age, address)
def getCounterValues(profile, key):
# This matches lines like these:
# NumTupleCacheHits: 1 (1)
# TupleCacheBytesWritten: 123.00 B (123)
# The regex extracts the value inside the parenthesis to get a simple numeric value
# rather than a pretty print of the same value.
counter_str_list = re.findall(r"{0}{1}: .* \((.*)\)".format(NODE_INDENT, key), profile)
return [int(v) for v in counter_str_list]
def assertCounterOrder(profile, key, vals):
values = getCounterValues(profile, key)
assert values == vals, values
def assertCounter(profile, key, val, num_matches):
if not isinstance(num_matches, list):
num_matches = [num_matches]
values = getCounterValues(profile, key)
assert len([v for v in values if v == val]) in num_matches, values
def assertCounters(profile, num_hits, num_halted, num_skipped, num_matches=1):
assertCounter(profile, NUM_HITS, num_hits, num_matches)
assertCounter(profile, NUM_HALTED, num_halted, num_matches)
assertCounter(profile, NUM_SKIPPED, num_skipped, num_matches)
def get_cache_keys(profile):
cache_keys = {}
last_node_id = -1
matcher = re.compile(r'TUPLE_CACHE_NODE \(id=([0-9]*)\)')
for line in profile.splitlines():
if "Combined Key:" in line:
key = line.split(":")[1].strip()
cache_keys[last_node_id].append(key)
continue
match = matcher.search(line)
if match:
last_node_id = int(match.group(1))
if last_node_id not in cache_keys:
cache_keys[last_node_id] = []
# Sort cache keys: with multiple nodes, order in the profile may change.
for _, val in cache_keys.items():
val.sort()
return next(iter(cache_keys.values())) if len(cache_keys) == 1 else cache_keys
def assert_deterministic_scan(vector, profile):
if vector.get_value('exec_option')['mt_dop'] > 0:
assert "deterministic scan range assignment: true" in profile
class TestTupleCacheBase(CustomClusterTestSuite):
@classmethod
def setup_class(cls):
super(TestTupleCacheBase, cls).setup_class()
# Unset this environment variable to ensure it doesn't affect
# the test like test_cache_disabled.
cls.org_tuple_cache_dir = os.getenv("TUPLE_CACHE_DIR")
if cls.org_tuple_cache_dir is not None:
os.unsetenv("TUPLE_CACHE_DIR")
@classmethod
def teardown_class(cls):
if cls.org_tuple_cache_dir is not None:
os.environ["TUPLE_CACHE_DIR"] = cls.org_tuple_cache_dir
super(TestTupleCacheBase, cls).teardown_class()
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheBase, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'enable_tuple_cache', 'true')
add_mandatory_exec_option(cls, 'tuple_cache_placement_policy', 'all_eligible')
# Generates a table containing at least <scale> KB of data.
def create_table(self, fq_table, scale=1):
self.execute_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT))
# To make the rows distinct, we keep using a different seed for table_value
global_index = 0
for _ in range(scale):
values = [table_value(i) for i in range(global_index, global_index + 70)]
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, "), (".join(values)))
global_index += 70
# Helper function to get a tuple cache metric from a single impalad.
def get_tuple_cache_metric(self, impalaservice, suffix):
return impalaservice.get_metric_value('impala.tuple-cache.' + suffix)
class TestTupleCacheOptions(TestTupleCacheBase):
"""Tests Impala with different tuple cache startup options."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheOptions, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'mt_dop', 1)
@CustomClusterTestSuite.with_args(cluster_size=1)
def test_cache_disabled(self, vector, unique_database):
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.cache_disabled".format(unique_database)
self.create_table(fq_table)
result1 = self.execute_query("SELECT * from {0}".format(fq_table))
result2 = self.execute_query("SELECT * from {0}".format(fq_table))
assert result1.success
assert result2.success
assert result1.data == result2.data
assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
assertCounters(result2.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
@CustomClusterTestSuite.with_args(
start_args=CACHE_START_ARGS + " --tuple_cache_capacity=64MB", cluster_size=1,
impalad_args="--cache_force_single_shard")
def test_cache_halted_select(self, vector):
# The cache is set to the minimum cache size, so run a SQL that produces enough
# data to exceed the cache size and halt caching.
self.client.set_configuration(vector.get_value('exec_option'))
big_enough_query = "SELECT o_comment from tpch.orders"
result1 = self.execute_query(big_enough_query)
result2 = self.execute_query(big_enough_query)
assert result1.success
assert result2.success
assert result1.data == result2.data
assertCounters(result1.runtime_profile, num_hits=0, num_halted=1, num_skipped=0)
bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten")
# This is running on a single node, so there should be a single location where
# TupleCacheBytesWritten exceeds 0.
assert len([v for v in bytes_written if v > 0]) == 1
assertCounters(result2.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
@CustomClusterTestSuite.with_args(
start_args=CACHE_START_ARGS, cluster_size=1,
impalad_args="--tuple_cache_ignore_query_options=true")
def test_failpoints(self, vector, unique_database):
fq_table = "{0}.failpoints".format(unique_database)
# Scale 20 gets us enough rows to force multiple RowBatches (needed for the
# the reader GetNext() cases).
self.create_table(fq_table, scale=20)
query = "SELECT * from {0}".format(fq_table)
def execute_debug(query, action):
exec_options = dict(vector.get_value('exec_option'))
exec_options['debug_action'] = action
return self.execute_query(query, exec_options)
# Fail when writing cache entry. All of these are handled and will not fail the
# query.
# Case 1: fail during Open()
result = execute_debug(query, "TUPLE_FILE_WRITER_OPEN:FAIL@1.0")
assert result.success
assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
# Case 2: fail during Write()
result = execute_debug(query, "TUPLE_FILE_WRITER_WRITE:FAIL@1.0")
assert result.success
assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
# Case 3: fail during Commit()
result = execute_debug(query, "TUPLE_FILE_WRITER_COMMIT:FAIL@1.0")
assert result.success
assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
# Now, successfully add a cache entry
result1 = self.execute_query(query, vector.get_value('exec_option'))
assert result1.success
assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
# Fail when reading a cache entry
# Case 1: fail during Open()
result = execute_debug(query, "TUPLE_FILE_READER_OPEN:FAIL@1.0")
assert result.success
# Do an unordered compare (the rows are unique)
assert set(result.data) == set(result1.data)
# Not a hit
assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=1)
# Case 2: fail during the first GetNext() call
result = execute_debug(query, "TUPLE_FILE_READER_FIRST_GETNEXT:FAIL@1.0")
assert result.success
# Do an unordered compare (the rows are unique)
assert set(result.data) == set(result1.data)
# Technically, this is a hit
assertCounters(result.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
# Case 3: fail during the second GetNext() call
# This one must fail for correctness, as it cannot fall back to the child if it
# has already returned cached rows
hit_error = False
try:
result = execute_debug(query, "TUPLE_FILE_READER_SECOND_GETNEXT:FAIL@1.0")
except Exception:
hit_error = True
assert hit_error
@CustomClusterTestSuite.with_args(
start_args=CACHE_START_ARGS, cluster_size=1,
impalad_args='--tuple_cache_exempt_query_options=max_errors,exec_time_limit_s')
def test_custom_exempt_query_options(self, vector, unique_database):
"""Custom list of exempt query options share cache entry"""
fq_table = "{0}.query_options".format(unique_database)
self.create_table(fq_table)
query = "SELECT * from {0}".format(fq_table)
errors_10 = dict(vector.get_value('exec_option'))
errors_10['max_errors'] = '10'
exec_time_limit = dict(vector.get_value('exec_option'))
exec_time_limit['exec_time_limit_s'] = '30'
exempt1 = self.execute_query(query, query_options=errors_10)
exempt2 = self.execute_query(query, query_options=exec_time_limit)
exempt3 = self.execute_query(query, query_options=vector.get_value('exec_option'))
assert exempt1.success
assert exempt2.success
assert exempt1.data == exempt2.data
assert exempt1.data == exempt3.data
assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1)
class TestTupleCacheSingle(TestTupleCacheBase):
"""Tests Impala with a single executor and mt_dop=1."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheSingle, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'mt_dop', 1)
def test_create_and_select(self, vector, unique_database):
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.create_and_select".format(unique_database)
self.create_table(fq_table)
result1 = self.execute_query("SELECT * from {0}".format(fq_table))
result2 = self.execute_query("SELECT * from {0}".format(fq_table))
assert result1.success
assert result2.success
assert result1.data == result2.data
assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(result2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
# Verify that the bytes written by the first profile are the same as the bytes
# read by the second profile.
bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten")
bytes_read = getCounterValues(result2.runtime_profile, "TupleCacheBytesRead")
assert sorted(bytes_written) == sorted(bytes_read)
def test_non_exempt_query_options(self, vector, unique_database):
"""Non-exempt query options result in different cache entries"""
fq_table = "{0}.query_options".format(unique_database)
self.create_table(fq_table)
query = "SELECT * from {0}".format(fq_table)
strict_true = dict(vector.get_value('exec_option'))
strict_true['strict_mode'] = 'true'
strict_false = dict(vector.get_value('exec_option'))
strict_false['strict_mode'] = 'false'
noexempt1 = self.execute_query(query, query_options=strict_false)
noexempt2 = self.execute_query(query, query_options=strict_true)
noexempt3 = self.execute_query(query, query_options=strict_false)
noexempt4 = self.execute_query(query, query_options=strict_true)
noexempt5 = self.execute_query(query, query_options=vector.get_value('exec_option'))
assert noexempt1.success
assert noexempt2.success
assert noexempt3.success
assert noexempt4.success
assert noexempt5.success
assert noexempt1.data == noexempt2.data
assert noexempt1.data == noexempt3.data
assert noexempt1.data == noexempt4.data
assert noexempt1.data == noexempt5.data
assertCounters(noexempt1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(noexempt2.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(noexempt3.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
assertCounters(noexempt4.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
assertCounters(noexempt5.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
def test_exempt_query_options(self, vector, unique_database):
"""Exempt query options share cache entry"""
fq_table = "{0}.query_options".format(unique_database)
self.create_table(fq_table)
query = "SELECT * from {0}".format(fq_table)
codegen_false = dict(vector.get_value('exec_option'))
codegen_false['disable_codegen'] = 'true'
codegen_true = dict(vector.get_value('exec_option'))
codegen_true['disable_codegen'] = 'false'
exempt1 = self.execute_query(query, query_options=codegen_true)
exempt2 = self.execute_query(query, query_options=codegen_false)
exempt3 = self.execute_query(query, query_options=vector.get_value('exec_option'))
assert exempt1.success
assert exempt2.success
assert exempt1.data == exempt2.data
assert exempt1.data == exempt3.data
assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0)
assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, num_skipped=0)
def test_aggregate(self, vector, unique_database):
"""Simple aggregation can be cached"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.agg".format(unique_database)
self.create_table(fq_table)
result1 = self.execute_query("SELECT sum(age) FROM {0}".format(fq_table))
result2 = self.execute_query("SELECT sum(age) FROM {0}".format(fq_table))
assert result1.success
assert result2.success
assert result1.data == result2.data
assertCounters(result1.runtime_profile, 0, 0, 0, num_matches=2)
# Aggregate should hit, and scan node below it will miss.
assertCounterOrder(result2.runtime_profile, NUM_HITS, [1, 0])
assertCounter(result2.runtime_profile, NUM_HALTED, 0, num_matches=2)
assertCounter(result2.runtime_profile, NUM_SKIPPED, 0, num_matches=2)
# Verify that the bytes written by the first profile are the same as the bytes
# read by the second profile.
bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten")
bytes_read = getCounterValues(result2.runtime_profile, "TupleCacheBytesRead")
assert len(bytes_written) == 2
assert len(bytes_read) == 1
assert bytes_written[0] == bytes_read[0]
def test_aggregate_reuse(self, vector):
"""Cached aggregation can be re-used"""
self.client.set_configuration(vector.get_value('exec_option'))
result = self.execute_query("SELECT sum(int_col) FROM functional.alltypes")
assert result.success
assertCounters(result.runtime_profile, 0, 0, 0, num_matches=2)
result_scan = self.execute_query("SELECT avg(int_col) FROM functional.alltypes")
assert result_scan.success
assertCounterOrder(result_scan.runtime_profile, NUM_HITS, [0, 1])
result_agg = self.execute_query(
"SELECT avg(a) FROM (SELECT sum(int_col) as a FROM functional.alltypes) b")
assert result_agg.success
assertCounterOrder(result_agg.runtime_profile, NUM_HITS, [1, 0])
def test_parquet_resolution_by_name(self, vector, unique_database):
"""Verify that parquet_fallback_schema_resolution=NAME works with tuple caching"""
self.run_test_case('QueryTest/parquet-resolution-by-name', vector,
use_db=unique_database)
def test_partition_information(self, vector):
"""Verify that partition information is incorporated into the runtime cache key"""
self.client.set_configuration(vector.get_value('exec_option'))
# scale_db.num_partitions_1234_blocks_per_partition_1 is an exotic table where all
# the partitions point to the same filesystem location. A single file is read many
# times for different partitions. It is not possible to tell the partitions apart
# by the file path, so this verifies that the partition information is being included
# properly.
query_template = \
"select i, j from scale_db.num_partitions_1234_blocks_per_partition_1 where j={0}"
# Run against the j=1 partition
result1 = self.execute_query(query_template.format(1))
assert result1.success
assertCounters(result1.runtime_profile, 0, 0, 0)
assert len(result1.data) == 1
assert result1.data[0].split("\t") == ["1", "1"]
# Run against the j=2 partition. There should not be a cache hit, because they are
# running against different partitions. This only works if the runtime key
# incorporates the partition information.
result2 = self.execute_query(query_template.format(2))
assert result2.success
assertCounters(result2.runtime_profile, 0, 0, 0)
assert len(result2.data) == 1
assert result2.data[0].split("\t") == ["1", "2"]
def test_json_binary_format(self, vector, unique_database):
"""This is identical to test_scanners.py's TestBinaryType::test_json_binary_format.
That test modifies a table's serde properties to change the json binary format.
The tuple cache detects that by including the partition's storage descriptor
information. This fails if that doesn't happen."""
test_tbl = unique_database + '.binary_tbl'
self.clone_table('functional_json.binary_tbl', test_tbl, False, vector)
self.run_test_case('QueryTest/json-binary-format', vector, unique_database)
def test_complex_types_verification(self, vector):
"""Run with correctness verification and check that it works with a query that
selects complex types."""
# We use custom query options to turn on verification. We also need to use
# expand_complex_types=true so that * includes columns with complex types
custom_options = dict(vector.get_value('exec_option'))
custom_options['enable_tuple_cache_verification'] = 'true'
custom_options['expand_complex_types'] = 'true'
# functional_parquet.complextypestbl has multiple columns with different types
# of complex types. e.g. nested_struct is a struct with multiple nested fields.
query = "select * from functional_parquet.complextypestbl"
result1 = self.execute_query(query, query_options=custom_options)
assert result1.success
assertCounters(result1.runtime_profile, 0, 0, 0)
# The second run is when correctness verification kicks in and tests the printing
# logic.
result2 = self.execute_query(query, query_options=custom_options)
assert result2.success
# The regular counters see this as skip
assertCounters(result2.runtime_profile, 0, 0, 1)
assertCounter(result2.runtime_profile, NUM_CORRECTNESS_VERIFICATION, 1, 1)
# Order by is currently not supported with complex types results, so sort the results
# before comparing them.
assert sorted(result1.data) == sorted(result2.data)
@CustomClusterTestSuite.with_args(
start_args=CACHE_START_ARGS,
impalad_args="--use_local_catalog=false",
catalogd_args="--catalog_topic_mode=full")
class TestTupleCacheCluster(TestTupleCacheBase):
"""Tests Impala with 3 executors and mt_dop=1."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheCluster, cls).add_test_dimensions()
add_mandatory_exec_option(cls, 'mt_dop', 1)
def test_runtime_filters(self, vector, unique_database):
"""
This tests that adding files to a table results in different runtime filter keys.
The last assertions after 'invaidate metadata' only meet if Impala cluster is in
legacy catalog mode.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.runtime_filters".format(unique_database)
# A query containing multiple runtime filters
# - scan of A receives runtime filters from B and C, so it depends on contents of B/C
# - scan of B receives runtime filter from C, so it depends on contents of C
query = "select straight_join a.id from functional.alltypes a, functional.alltypes" \
" b, {0} c where a.id = b.id and a.id = c.age order by a.id".format(fq_table)
query_a_id = 10
query_b_id = 11
query_c_id = 12
# Create an empty table
self.create_table(fq_table, scale=0)
# Establish a baseline
empty_result = self.execute_query(query)
empty_cache_keys = get_cache_keys(empty_result.runtime_profile)
# Tables a and b have multiple files, so they are distributed across all 3 nodes.
# Table c has one file, so it has a single entry.
assert len(empty_cache_keys) == 3
assert len(empty_cache_keys[query_c_id]) == 1
empty_c_compile_key, empty_c_finst_key = empty_cache_keys[query_c_id][0].split("_")
assert empty_c_finst_key == "0"
assert len(empty_result.data) == 0
# Insert a row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, table_value(0)))
# Now, there is a scan range, so the fragment instance key should be non-zero.
one_file_result = self.execute_query(query)
one_cache_keys = get_cache_keys(one_file_result.runtime_profile)
assert len(one_cache_keys) == 3
assert len(empty_cache_keys[query_c_id]) == 1
one_c_compile_key, one_c_finst_key = one_cache_keys[query_c_id][0].split("_")
assert one_c_finst_key != "0"
# This should be a cache miss
assertCounters(one_file_result.runtime_profile, 0, 0, 0, 7)
assert len(one_file_result.data) == 1
# The new scan range did not change the compile-time key, but did change the runtime
# filter keys.
for id in [query_a_id, query_b_id]:
assert len(empty_cache_keys[id]) == len(one_cache_keys[id])
for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]):
assert empty != one
assert empty_c_compile_key == one_c_compile_key
# Insert another row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, table_value(1)))
# There is a second scan range, so the fragment instance key should change again
two_files_result = self.execute_query(query)
two_cache_keys = get_cache_keys(two_files_result.runtime_profile)
assert len(two_cache_keys) == 3
assert len(two_cache_keys[query_c_id]) == 2
two_c1_compile_key, two_c1_finst_key = two_cache_keys[query_c_id][0].split("_")
two_c2_compile_key, two_c2_finst_key = two_cache_keys[query_c_id][1].split("_")
assert two_c1_finst_key != "0"
assert two_c2_finst_key != "0"
# There may be a cache hit for the prior "c" scan range (if scheduled to the same
# instance), and the rest cache misses.
assertCounter(two_files_result.runtime_profile, NUM_HITS, 0, num_matches=[7, 8])
assertCounter(two_files_result.runtime_profile, NUM_HITS, 1, num_matches=[0, 1])
assertCounter(two_files_result.runtime_profile, NUM_HALTED, 0, num_matches=8)
assertCounter(two_files_result.runtime_profile, NUM_SKIPPED, 0, num_matches=8)
assert len(two_files_result.data) == 2
# Ordering can vary by environment. Ensure one matches and one differs.
assert one_c_finst_key == two_c1_finst_key or one_c_finst_key == two_c2_finst_key
assert one_c_finst_key != two_c1_finst_key or one_c_finst_key != two_c2_finst_key
overlapping_rows = set(one_file_result.data).intersection(set(two_files_result.data))
assert len(overlapping_rows) == 1
# The new scan range did not change the compile-time key, but did change the runtime
# filter keys.
for id in [query_a_id, query_b_id]:
assert len(empty_cache_keys[id]) == len(one_cache_keys[id])
for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]):
assert empty != one
assert one_c_compile_key == two_c1_compile_key
assert one_c_compile_key == two_c2_compile_key
# Invalidate metadata and rerun the last query. The keys should stay the same.
self.execute_query("invalidate metadata")
rerun_two_files_result = self.execute_query(query)
# Verify that this is a cache hit
assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0, num_matches=8)
rerun_cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile)
assert rerun_cache_keys == two_cache_keys
assert rerun_two_files_result.data == two_files_result.data
def test_runtime_filter_reload(self, vector, unique_database):
"""
This tests that reloading files to a table results in matching runtime filter keys.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.runtime_filter_genspec".format(unique_database)
# Query where fq_table generates a runtime filter.
query = "select straight_join a.id from functional.alltypes a, {0} b " \
"where a.id = b.age order by a.id".format(fq_table)
# Create a partitioned table with 3 partitions
self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING) "
"PARTITIONED BY (age INT)".format(fq_table))
self.execute_query(
"INSERT INTO {0} PARTITION(age=4) VALUES (\"Vanessa\")".format(fq_table))
self.execute_query(
"INSERT INTO {0} PARTITION(age=5) VALUES (\"Carl\")".format(fq_table))
self.execute_query(
"INSERT INTO {0} PARTITION(age=6) VALUES (\"Cleopatra\")".format(fq_table))
# Prime the cache
base_result = self.execute_query(query)
base_cache_keys = get_cache_keys(base_result.runtime_profile)
assert len(base_cache_keys) == 3
# Drop and reload the table
self.execute_query("DROP TABLE {0}".format(fq_table))
self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING, address STRING) "
"PARTITIONED BY (age INT)".format(fq_table))
self.execute_query("ALTER TABLE {0} RECOVER PARTITIONS".format(fq_table))
# Verify we reuse the cache
reload_result = self.execute_query(query)
reload_cache_keys = get_cache_keys(reload_result.runtime_profile)
assert base_result.data == reload_result.data
assert base_cache_keys == reload_cache_keys
# Skips verifying cache hits as fragments may not be assigned to the same nodes.
def test_join_modifications(self, vector, unique_database):
"""
This tests caching above a join without runtime filters and verifies that changes
to the build side table results in a different cache key.
"""
fq_table = "{0}.join_modifications".format(unique_database)
query = "select straight_join probe.id from functional.alltypes probe join " \
"/* +broadcast */ {0} build on (probe.id = build.age) ".format(fq_table) + \
"order by probe.id"
# Create an empty table
self.create_table(fq_table, scale=0)
probe_id = 6
build_id = 7
above_join_id = 8
# Run without runtime filters to verify the regular path works
no_runtime_filters = dict(vector.get_value('exec_option'))
no_runtime_filters['runtime_filter_mode'] = 'off'
# Establish a baseline
empty_result = self.execute_query(query, no_runtime_filters)
empty_cache_keys = get_cache_keys(empty_result.runtime_profile)
# The build side is on one node. The probe side is on three nodes.
assert len(empty_cache_keys) == 3
assert len(empty_cache_keys[probe_id]) == 3
assert len(empty_cache_keys[build_id]) == 1
assert len(empty_cache_keys[above_join_id]) == 3
empty_build_key = empty_cache_keys[build_id][0]
empty_build_compile_key, empty_build_finst_key = empty_build_key.split("_")
assert empty_build_finst_key == "0"
assert len(empty_result.data) == 0
empty_join_compile_key = empty_cache_keys[above_join_id][0].split("_")[0]
# Insert a row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, table_value(0)))
# There is a build-side scan range, so the fragment instance key should be non-zero.
one_file_result = self.execute_query(query, no_runtime_filters)
assert len(one_file_result.data) == 1
one_cache_keys = get_cache_keys(one_file_result.runtime_profile)
assert len(one_cache_keys) == 3
assert len(one_cache_keys[probe_id]) == 3
assert len(one_cache_keys[build_id]) == 1
assert len(one_cache_keys[above_join_id]) == 3
one_build_key = one_cache_keys[build_id][0]
one_build_compile_key, one_build_finst_key = one_build_key.split("_")
assert one_build_finst_key != "0"
assert one_build_compile_key == empty_build_compile_key
# This should be a cache miss for the build side and above the join, but a cache
# hit for the probe side (3 instances).
assertCounter(one_file_result.runtime_profile, NUM_HITS, 1, 3)
assertCounter(one_file_result.runtime_profile, NUM_HALTED, 0, 7)
assertCounter(one_file_result.runtime_profile, NUM_SKIPPED, 0, 7)
# The above join compile time key should have changed, because it incorporates the
# build side scan ranges.
one_join_compile_key = one_cache_keys[above_join_id][0].split("_")[0]
assert one_join_compile_key != empty_join_compile_key
def test_join_timing(self, vector):
"""
This verifies that a very short query with a cache hit above a join can complete
below a certain threshold. This should be sensitive to issues with synchronization
with the shared join builder.
"""
query = "select straight_join probe.id from functional.alltypes probe join " \
"/* +broadcast */ functional.alltypes build on (probe.id = build.id) " \
"order by probe.id"
# To avoid interaction with cache entries from previous tests, set an unrelated
# query option to keep the key different.
custom_options = dict(vector.get_value('exec_option'))
custom_options['batch_size'] = '1234'
first_run_result = self.execute_query(query, custom_options)
assert len(first_run_result.data) == 7300
assertCounter(first_run_result.runtime_profile, NUM_HITS, 0, 9)
assertCounter(first_run_result.runtime_profile, NUM_HALTED, 0, 9)
assertCounter(first_run_result.runtime_profile, NUM_SKIPPED, 0, 9)
start_time = time.time()
second_run_result = self.execute_query(query, custom_options)
end_time = time.time()
# The location above the join hits and the location on the build side hits,
# but the probe location is below the join and doesn't hit.
assertCounter(second_run_result.runtime_profile, NUM_HITS, 1, 6)
assertCounter(second_run_result.runtime_profile, NUM_HALTED, 0, 9)
assertCounter(second_run_result.runtime_profile, NUM_SKIPPED, 0, 9)
# As a sanity check for the synchronization pieces, verify that this runs in less
# than 750 milliseconds.
assert end_time - start_time < 0.75
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1)
class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase):
"""Simpler tests that run on a single node with mt_dop=0 or mt_dop=1."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheRuntimeKeysBasic, cls).add_test_dimensions()
add_exec_option_dimension(cls, 'mt_dop', [0, 1])
def test_scan_range_basics(self, vector, unique_database):
"""
This tests that adding/removing files to a table results in different keys.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.scan_range_basics".format(unique_database)
query = "SELECT * from {0}".format(fq_table)
# Create an empty table
self.create_table(fq_table, scale=0)
# When there are no scan ranges, then fragment instance key is 0. This is
# somewhat of a toy case and we probably want to avoid caching in this
# case. Nonetheless, it is a good sanity check.
empty_result = self.execute_query(query)
cache_keys = get_cache_keys(empty_result.runtime_profile)
assert len(cache_keys) == 1
empty_table_compile_key, empty_table_finst_key = cache_keys[0].split("_")
assert empty_table_finst_key == "0"
assert len(empty_result.data) == 0
assert_deterministic_scan(vector, empty_result.runtime_profile)
# Insert a row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(0)))
# Now, there is a scan range, so the fragment instance key should be non-zero.
one_file_result = self.execute_query(query)
cache_keys = get_cache_keys(one_file_result.runtime_profile)
assert len(cache_keys) == 1
one_file_compile_key, one_file_finst_key = cache_keys[0].split("_")
assert one_file_finst_key != "0"
# This should be a cache miss
assertCounters(one_file_result.runtime_profile, 0, 0, 0)
assert len(one_file_result.data) == 1
assert_deterministic_scan(vector, one_file_result.runtime_profile)
# The new scan range did not change the compile-time key
assert empty_table_compile_key == one_file_compile_key
# Insert another row, which creates a file / scan range
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(1)))
# There is a second scan range, so the fragment instance key should change again
two_files_result = self.execute_query(query)
cache_keys = get_cache_keys(two_files_result.runtime_profile)
assert len(cache_keys) == 1
two_files_compile_key, two_files_finst_key = cache_keys[0].split("_")
assert two_files_finst_key != "0"
assertCounters(two_files_result.runtime_profile, 0, 0, 0)
assert len(two_files_result.data) == 2
assert one_file_finst_key != two_files_finst_key
overlapping_rows = set(one_file_result.data).intersection(set(two_files_result.data))
assert len(overlapping_rows) == 1
assert_deterministic_scan(vector, two_files_result.runtime_profile)
# The new scan range did not change the compile-time key
assert one_file_compile_key == two_files_compile_key
# Invalidate metadata and rerun the last query. The keys should stay the same.
self.execute_query("invalidate metadata")
rerun_two_files_result = self.execute_query(query)
# Verify that this is a cache hit
assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0)
cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile)
assert len(cache_keys) == 1
rerun_two_files_compile_key, rerun_two_files_finst_key = cache_keys[0].split("_")
assert rerun_two_files_finst_key == two_files_finst_key
assert rerun_two_files_compile_key == two_files_compile_key
assert rerun_two_files_result.data == two_files_result.data
def test_scan_range_partitioned(self, vector):
"""
This tests a basic partitioned case where the query is identical except that
it operates on different partitions (and thus different scan ranges).
"""
self.client.set_configuration(vector.get_value('exec_option'))
year2009_result = self.execute_query(
"select * from functional.alltypes where year=2009")
cache_keys = get_cache_keys(year2009_result.runtime_profile)
assert len(cache_keys) == 1
year2009_compile_key, year2009_finst_key = cache_keys[0].split("_")
year2010_result = self.execute_query(
"select * from functional.alltypes where year=2010")
cache_keys = get_cache_keys(year2010_result.runtime_profile)
assert len(cache_keys) == 1
year2010_compile_key, year2010_finst_key = cache_keys[0].split("_")
# This should be a cache miss
assertCounters(year2010_result.runtime_profile, 0, 0, 0)
# The year=X predicate is on a partition column, so it is enforced by pruning
# partitions and doesn't carry through to execution. The compile keys for
# the two queries are the same, but the fragment instance keys are different due
# to the different scan ranges from different partitions.
assert year2009_compile_key == year2010_compile_key
assert year2009_finst_key != year2010_finst_key
# Verify that the results are completely different
year2009_result_set = set(year2009_result.data)
year2010_result_set = set(year2010_result.data)
overlapping_rows = year2009_result_set.intersection(year2010_result_set)
assert len(overlapping_rows) == 0
assert year2009_result.data[0].find("2009") != -1
assert year2009_result.data[0].find("2010") == -1
assert year2010_result.data[0].find("2010") != -1
assert year2010_result.data[0].find("2009") == -1
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS)
class TestTupleCacheFullCluster(TestTupleCacheBase):
"""Test with 3 executors and a range of mt_dop values."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheFullCluster, cls).add_test_dimensions()
add_exec_option_dimension(cls, 'mt_dop', [0, 1, 2])
def test_scan_range_distributed(self, vector, unique_database):
"""
This tests the distributed case where there are multiple fragment instances
processing different scan ranges. Each fragment instance should have a
distinct cache key. When adding a scan range, at least one fragment instance
cache key should change.
"""
self.client.set_configuration(vector.get_value('exec_option'))
mt_dop = vector.get_value('exec_option')['mt_dop']
fq_table = "{0}.scan_range_distributed".format(unique_database)
query = "SELECT * from {0}".format(fq_table)
entries_baseline = {
impalad: self.get_tuple_cache_metric(impalad.service, "entries-in-use")
for impalad in self.cluster.impalads}
# Create a table with several files so that we always have enough work for multiple
# fragment instances
self.create_table(fq_table, scale=20)
# We run a simple select. This is running with multiple impalads, so there are
# always multiple fragment instances
before_result = self.execute_query(query)
cache_keys = get_cache_keys(before_result.runtime_profile)
expected_num_keys = 3 * max(mt_dop, 1)
assert len(cache_keys) == expected_num_keys
# Every cache key should be distinct, as the fragment instances are processing
# different data
unique_cache_keys = set(cache_keys)
assert len(unique_cache_keys) == expected_num_keys
# Every cache key has the same compile key
unique_compile_keys = set([key.split("_")[0] for key in unique_cache_keys])
assert len(unique_compile_keys) == 1
# Verify the cache metrics for each impalad. Determine number of new cache entries,
# which should be the same as the number of cache keys.
for impalad in self.cluster.impalads:
entries_in_use = self.get_tuple_cache_metric(impalad.service, "entries-in-use")
assert entries_in_use - entries_baseline[impalad] == max(mt_dop, 1)
assert_deterministic_scan(vector, before_result.runtime_profile)
entries_before_insert = {
impalad: self.get_tuple_cache_metric(impalad.service, "entries-in-use")
for impalad in self.cluster.impalads}
# Some modification times have coarse granularity (e.g. a second). If the insert runs
# too quickly, the new file could have the same modification time as an existing
# file. In that case, the sort may not place it last, causing unexpected changes to
# the cache keys. Sleep a bit to guarantee a newer modification time.
time.sleep(3)
# Insert another row, which creates a file / scan range
# This uses a very large seed for table_value() to get a unique row that isn't
# already in the table.
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(1000000)))
# Rerun the query with the extra scan range
after_insert_result = self.execute_query(query)
cache_keys = get_cache_keys(after_insert_result.runtime_profile)
expected_num_keys = 3 * max(mt_dop, 1)
assert len(cache_keys) == expected_num_keys
# Every cache key should be distinct, as the fragment instances are processing
# different data
after_insert_unique_cache_keys = set(cache_keys)
assert len(after_insert_unique_cache_keys) == expected_num_keys
# Every cache key has the same compile key
unique_compile_keys = \
set([key.split("_")[0] for key in after_insert_unique_cache_keys])
assert len(unique_compile_keys) == 1
# Verify the cache metrics. Scheduling scan ranges from oldest to newest makes this
# deterministic. The new file will be scheduled last and will change exactly one
# cache key.
assert len(after_insert_unique_cache_keys - unique_cache_keys) == 1
total_new_entries = 0
for impalad in self.cluster.impalads:
new_entries_in_use = self.get_tuple_cache_metric(impalad.service, "entries-in-use")
new_entries_in_use -= entries_before_insert[impalad]
# We're comparing with before the insert, so one node will have a new entry and all
# others will be the same.
assert new_entries_in_use in [0, 1]
total_new_entries += new_entries_in_use
assert total_new_entries == 1
assert_deterministic_scan(vector, after_insert_result.runtime_profile)
# The extra scan range means that at least one fragment instance key changed
# Since scheduling can change completely with the addition of a single scan range,
# we can't assert that only one cache key changes.
changed_cache_keys = unique_cache_keys.symmetric_difference(
after_insert_unique_cache_keys)
assert len(changed_cache_keys) != 0
# Each row is distinct, so that makes it easy to verify that the results overlap
# except the second result contains one more row than the first result.
before_result_set = set(before_result.data)
after_insert_result_set = set(after_insert_result.data)
assert len(before_result_set) == 70 * 20
assert len(before_result_set) + 1 == len(after_insert_result_set)
different_rows = before_result_set.symmetric_difference(after_insert_result_set)
assert len(different_rows) == 1
@SkipIfDockerizedCluster.internal_hostname
@SkipIf.hardcoded_uris
def test_iceberg_deletes(self, vector): # noqa: U100
"""
Test basic Iceberg v2 deletes, which relies on the directed mode and looking
past TupleCacheNodes to find the scan nodes.
"""
# This query tests both equality deletes and positional deletes.
query = "select * from functional_parquet.iceberg_v2_delete_both_eq_and_pos " + \
"order by i"
result1 = self.execute_query(query)
result2 = self.execute_query(query)
assert result1.success and result2.success
assert result1.data == result2.data
assert result1.data[0].split("\t") == ["2", "str2_updated", "2023-12-13"]
assert result1.data[1].split("\t") == ["3", "str3", "2023-12-23"]
@CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1)
class TestTupleCacheMtdop(TestTupleCacheBase):
"""Test with single executor and mt_dop=0 or 2."""
@classmethod
def add_test_dimensions(cls):
super(TestTupleCacheMtdop, cls).add_test_dimensions()
add_exec_option_dimension(cls, 'mt_dop', [0, 2])
def test_tuple_cache_count_star(self, vector, unique_database):
"""
This test is a regression test for IMPALA-13411 to see whether it hits
the DCHECK.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.tuple_cache_count_star".format(unique_database)
# Create a table.
self.create_table(fq_table, scale=1)
# Run twice and see if it hits the DCHECK.
query = "select count(*) from {0}".format(fq_table)
result1 = self.execute_query(query)
result2 = self.execute_query(query)
assert result1.success and result2.success
def test_tuple_cache_key_with_stats(self, vector, unique_database):
"""
This test verifies if compute stats affect the tuple cache key.
"""
self.client.set_configuration(vector.get_value('exec_option'))
fq_table = "{0}.tuple_cache_stats_test".format(unique_database)
# Create a table.
self.create_table(fq_table, scale=1)
# Get the explain text for a simple query.
query = "explain select * from {0}".format(fq_table)
result1 = self.execute_query(query)
# Insert rows to make the stats different.
for i in range(10):
self.execute_query("INSERT INTO {0} VALUES ({1})".format(
fq_table, table_value(i)))
# Run compute stats and get the explain text again for the same query.
self.client.execute("COMPUTE STATS {0}".format(fq_table))
result2 = self.execute_query(query)
# Verify memory estimations are different, while the cache keys are identical.
assert result1.success and result2.success
mem_limit1, units1 = match_memory_estimate(result1.data)
mem_limit1 = parse_mem_to_mb(mem_limit1, units1)
mem_limit2, units2 = match_memory_estimate(result2.data)
mem_limit2 = parse_mem_to_mb(mem_limit2, units2)
assert mem_limit1 != mem_limit2
cache_key1 = match_cache_key(result1.data)
cache_key2 = match_cache_key(result2.data)
assert cache_key1 is not None and cache_key1 == cache_key2