| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| from __future__ import absolute_import, division, print_function |
| |
| import os |
| import random |
| import re |
| import string |
| import time |
| |
| from tests.common.custom_cluster_test_suite import CustomClusterTestSuite |
| from tests.common.skip import SkipIfDockerizedCluster, SkipIf |
| from tests.common.test_dimensions import ( |
| add_exec_option_dimension, add_mandatory_exec_option) |
| from tests.util.parse_util import ( |
| match_memory_estimate, parse_mem_to_mb, match_cache_key) |
| |
| TABLE_LAYOUT = 'name STRING, age INT, address STRING' |
| CACHE_START_ARGS = \ |
| "--tuple_cache_dir=/tmp --tuple_cache_debug_dump_dir=/tmp --log_level=2" |
| NUM_HITS = 'NumTupleCacheHits' |
| NUM_HALTED = 'NumTupleCacheHalted' |
| NUM_SKIPPED = 'NumTupleCacheSkipped' |
| NUM_CORRECTNESS_VERIFICATION = 'NumTupleCacheCorrectnessVerification' |
| # Indenation used for TUPLE_CACHE_NODE in specific fragments (not averaged fragment). |
| NODE_INDENT = ' - ' |
| |
| |
| # Generates a random table entry of at least 15 bytes. |
| def table_value(seed): |
| r = random.Random(seed) |
| name = "".join([r.choice(string.ascii_letters) for _ in range(r.randint(5, 20))]) |
| age = r.randint(1, 90) |
| address = "{0} {1}".format(r.randint(1, 9999), |
| "".join([r.choice(string.ascii_letters) for _ in range(r.randint(4, 12))])) |
| return '"{0}", {1}, "{2}"'.format(name, age, address) |
| |
| |
| def getCounterValues(profile, key): |
| # This matches lines like these: |
| # NumTupleCacheHits: 1 (1) |
| # TupleCacheBytesWritten: 123.00 B (123) |
| # The regex extracts the value inside the parenthesis to get a simple numeric value |
| # rather than a pretty print of the same value. |
| counter_str_list = re.findall(r"{0}{1}: .* \((.*)\)".format(NODE_INDENT, key), profile) |
| return [int(v) for v in counter_str_list] |
| |
| |
| def assertCounterOrder(profile, key, vals): |
| values = getCounterValues(profile, key) |
| assert values == vals, values |
| |
| |
| def assertCounter(profile, key, val, num_matches): |
| if not isinstance(num_matches, list): |
| num_matches = [num_matches] |
| values = getCounterValues(profile, key) |
| assert len([v for v in values if v == val]) in num_matches, values |
| |
| |
| def assertCounters(profile, num_hits, num_halted, num_skipped, num_matches=1): |
| assertCounter(profile, NUM_HITS, num_hits, num_matches) |
| assertCounter(profile, NUM_HALTED, num_halted, num_matches) |
| assertCounter(profile, NUM_SKIPPED, num_skipped, num_matches) |
| |
| |
| def get_cache_keys(profile): |
| cache_keys = {} |
| last_node_id = -1 |
| matcher = re.compile(r'TUPLE_CACHE_NODE \(id=([0-9]*)\)') |
| for line in profile.splitlines(): |
| if "Combined Key:" in line: |
| key = line.split(":")[1].strip() |
| cache_keys[last_node_id].append(key) |
| continue |
| |
| match = matcher.search(line) |
| if match: |
| last_node_id = int(match.group(1)) |
| if last_node_id not in cache_keys: |
| cache_keys[last_node_id] = [] |
| |
| # Sort cache keys: with multiple nodes, order in the profile may change. |
| for _, val in cache_keys.items(): |
| val.sort() |
| |
| return next(iter(cache_keys.values())) if len(cache_keys) == 1 else cache_keys |
| |
| |
| def assert_deterministic_scan(vector, profile): |
| if vector.get_value('exec_option')['mt_dop'] > 0: |
| assert "deterministic scan range assignment: true" in profile |
| |
| |
| class TestTupleCacheBase(CustomClusterTestSuite): |
| @classmethod |
| def setup_class(cls): |
| super(TestTupleCacheBase, cls).setup_class() |
| # Unset this environment variable to ensure it doesn't affect |
| # the test like test_cache_disabled. |
| cls.org_tuple_cache_dir = os.getenv("TUPLE_CACHE_DIR") |
| if cls.org_tuple_cache_dir is not None: |
| os.unsetenv("TUPLE_CACHE_DIR") |
| |
| @classmethod |
| def teardown_class(cls): |
| if cls.org_tuple_cache_dir is not None: |
| os.environ["TUPLE_CACHE_DIR"] = cls.org_tuple_cache_dir |
| super(TestTupleCacheBase, cls).teardown_class() |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestTupleCacheBase, cls).add_test_dimensions() |
| add_mandatory_exec_option(cls, 'enable_tuple_cache', 'true') |
| add_mandatory_exec_option(cls, 'tuple_cache_placement_policy', 'all_eligible') |
| |
| # Generates a table containing at least <scale> KB of data. |
| def create_table(self, fq_table, scale=1): |
| self.execute_query("CREATE TABLE {0} ({1})".format(fq_table, TABLE_LAYOUT)) |
| # To make the rows distinct, we keep using a different seed for table_value |
| global_index = 0 |
| for _ in range(scale): |
| values = [table_value(i) for i in range(global_index, global_index + 70)] |
| self.execute_query("INSERT INTO {0} VALUES ({1})".format( |
| fq_table, "), (".join(values))) |
| global_index += 70 |
| |
| # Helper function to get a tuple cache metric from a single impalad. |
| def get_tuple_cache_metric(self, impalaservice, suffix): |
| return impalaservice.get_metric_value('impala.tuple-cache.' + suffix) |
| |
| |
| class TestTupleCacheOptions(TestTupleCacheBase): |
| """Tests Impala with different tuple cache startup options.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestTupleCacheOptions, cls).add_test_dimensions() |
| add_mandatory_exec_option(cls, 'mt_dop', 1) |
| |
| @CustomClusterTestSuite.with_args(cluster_size=1) |
| def test_cache_disabled(self, vector, unique_database): |
| self.client.set_configuration(vector.get_value('exec_option')) |
| fq_table = "{0}.cache_disabled".format(unique_database) |
| self.create_table(fq_table) |
| result1 = self.execute_query("SELECT * from {0}".format(fq_table)) |
| result2 = self.execute_query("SELECT * from {0}".format(fq_table)) |
| |
| assert result1.success |
| assert result2.success |
| assert result1.data == result2.data |
| assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=1) |
| assertCounters(result2.runtime_profile, num_hits=0, num_halted=0, num_skipped=1) |
| |
| @CustomClusterTestSuite.with_args( |
| start_args=CACHE_START_ARGS + " --tuple_cache_capacity=64MB", cluster_size=1, |
| impalad_args="--cache_force_single_shard") |
| def test_cache_halted_select(self, vector): |
| # The cache is set to the minimum cache size, so run a SQL that produces enough |
| # data to exceed the cache size and halt caching. |
| self.client.set_configuration(vector.get_value('exec_option')) |
| big_enough_query = "SELECT o_comment from tpch.orders" |
| result1 = self.execute_query(big_enough_query) |
| result2 = self.execute_query(big_enough_query) |
| |
| assert result1.success |
| assert result2.success |
| assert result1.data == result2.data |
| assertCounters(result1.runtime_profile, num_hits=0, num_halted=1, num_skipped=0) |
| bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten") |
| # This is running on a single node, so there should be a single location where |
| # TupleCacheBytesWritten exceeds 0. |
| assert len([v for v in bytes_written if v > 0]) == 1 |
| assertCounters(result2.runtime_profile, num_hits=0, num_halted=0, num_skipped=1) |
| |
| @CustomClusterTestSuite.with_args( |
| start_args=CACHE_START_ARGS, cluster_size=1, |
| impalad_args="--tuple_cache_ignore_query_options=true") |
| def test_failpoints(self, vector, unique_database): |
| fq_table = "{0}.failpoints".format(unique_database) |
| # Scale 20 gets us enough rows to force multiple RowBatches (needed for the |
| # the reader GetNext() cases). |
| self.create_table(fq_table, scale=20) |
| query = "SELECT * from {0}".format(fq_table) |
| |
| def execute_debug(query, action): |
| exec_options = dict(vector.get_value('exec_option')) |
| exec_options['debug_action'] = action |
| return self.execute_query(query, exec_options) |
| |
| # Fail when writing cache entry. All of these are handled and will not fail the |
| # query. |
| # Case 1: fail during Open() |
| result = execute_debug(query, "TUPLE_FILE_WRITER_OPEN:FAIL@1.0") |
| assert result.success |
| assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=1) |
| |
| # Case 2: fail during Write() |
| result = execute_debug(query, "TUPLE_FILE_WRITER_WRITE:FAIL@1.0") |
| assert result.success |
| assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=0) |
| |
| # Case 3: fail during Commit() |
| result = execute_debug(query, "TUPLE_FILE_WRITER_COMMIT:FAIL@1.0") |
| assert result.success |
| assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=0) |
| |
| # Now, successfully add a cache entry |
| result1 = self.execute_query(query, vector.get_value('exec_option')) |
| assert result1.success |
| assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0) |
| |
| # Fail when reading a cache entry |
| # Case 1: fail during Open() |
| result = execute_debug(query, "TUPLE_FILE_READER_OPEN:FAIL@1.0") |
| assert result.success |
| # Do an unordered compare (the rows are unique) |
| assert set(result.data) == set(result1.data) |
| # Not a hit |
| assertCounters(result.runtime_profile, num_hits=0, num_halted=0, num_skipped=1) |
| |
| # Case 2: fail during the first GetNext() call |
| result = execute_debug(query, "TUPLE_FILE_READER_FIRST_GETNEXT:FAIL@1.0") |
| assert result.success |
| # Do an unordered compare (the rows are unique) |
| assert set(result.data) == set(result1.data) |
| # Technically, this is a hit |
| assertCounters(result.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) |
| |
| # Case 3: fail during the second GetNext() call |
| # This one must fail for correctness, as it cannot fall back to the child if it |
| # has already returned cached rows |
| hit_error = False |
| try: |
| result = execute_debug(query, "TUPLE_FILE_READER_SECOND_GETNEXT:FAIL@1.0") |
| except Exception: |
| hit_error = True |
| |
| assert hit_error |
| |
| @CustomClusterTestSuite.with_args( |
| start_args=CACHE_START_ARGS, cluster_size=1, |
| impalad_args='--tuple_cache_exempt_query_options=max_errors,exec_time_limit_s') |
| def test_custom_exempt_query_options(self, vector, unique_database): |
| """Custom list of exempt query options share cache entry""" |
| fq_table = "{0}.query_options".format(unique_database) |
| self.create_table(fq_table) |
| query = "SELECT * from {0}".format(fq_table) |
| |
| errors_10 = dict(vector.get_value('exec_option')) |
| errors_10['max_errors'] = '10' |
| exec_time_limit = dict(vector.get_value('exec_option')) |
| exec_time_limit['exec_time_limit_s'] = '30' |
| |
| exempt1 = self.execute_query(query, query_options=errors_10) |
| exempt2 = self.execute_query(query, query_options=exec_time_limit) |
| exempt3 = self.execute_query(query, query_options=vector.get_value('exec_option')) |
| assert exempt1.success |
| assert exempt2.success |
| assert exempt1.data == exempt2.data |
| assert exempt1.data == exempt3.data |
| assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0) |
| assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) |
| assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) |
| |
| |
| @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1) |
| class TestTupleCacheSingle(TestTupleCacheBase): |
| """Tests Impala with a single executor and mt_dop=1.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestTupleCacheSingle, cls).add_test_dimensions() |
| add_mandatory_exec_option(cls, 'mt_dop', 1) |
| |
| def test_create_and_select(self, vector, unique_database): |
| self.client.set_configuration(vector.get_value('exec_option')) |
| fq_table = "{0}.create_and_select".format(unique_database) |
| self.create_table(fq_table) |
| result1 = self.execute_query("SELECT * from {0}".format(fq_table)) |
| result2 = self.execute_query("SELECT * from {0}".format(fq_table)) |
| |
| assert result1.success |
| assert result2.success |
| assert result1.data == result2.data |
| assertCounters(result1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0) |
| assertCounters(result2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) |
| # Verify that the bytes written by the first profile are the same as the bytes |
| # read by the second profile. |
| bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten") |
| bytes_read = getCounterValues(result2.runtime_profile, "TupleCacheBytesRead") |
| assert sorted(bytes_written) == sorted(bytes_read) |
| |
| def test_non_exempt_query_options(self, vector, unique_database): |
| """Non-exempt query options result in different cache entries""" |
| fq_table = "{0}.query_options".format(unique_database) |
| self.create_table(fq_table) |
| query = "SELECT * from {0}".format(fq_table) |
| |
| strict_true = dict(vector.get_value('exec_option')) |
| strict_true['strict_mode'] = 'true' |
| strict_false = dict(vector.get_value('exec_option')) |
| strict_false['strict_mode'] = 'false' |
| |
| noexempt1 = self.execute_query(query, query_options=strict_false) |
| noexempt2 = self.execute_query(query, query_options=strict_true) |
| noexempt3 = self.execute_query(query, query_options=strict_false) |
| noexempt4 = self.execute_query(query, query_options=strict_true) |
| noexempt5 = self.execute_query(query, query_options=vector.get_value('exec_option')) |
| |
| assert noexempt1.success |
| assert noexempt2.success |
| assert noexempt3.success |
| assert noexempt4.success |
| assert noexempt5.success |
| assert noexempt1.data == noexempt2.data |
| assert noexempt1.data == noexempt3.data |
| assert noexempt1.data == noexempt4.data |
| assert noexempt1.data == noexempt5.data |
| assertCounters(noexempt1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0) |
| assertCounters(noexempt2.runtime_profile, num_hits=0, num_halted=0, num_skipped=0) |
| assertCounters(noexempt3.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) |
| assertCounters(noexempt4.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) |
| assertCounters(noexempt5.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) |
| |
| def test_exempt_query_options(self, vector, unique_database): |
| """Exempt query options share cache entry""" |
| fq_table = "{0}.query_options".format(unique_database) |
| self.create_table(fq_table) |
| query = "SELECT * from {0}".format(fq_table) |
| |
| codegen_false = dict(vector.get_value('exec_option')) |
| codegen_false['disable_codegen'] = 'true' |
| codegen_true = dict(vector.get_value('exec_option')) |
| codegen_true['disable_codegen'] = 'false' |
| |
| exempt1 = self.execute_query(query, query_options=codegen_true) |
| exempt2 = self.execute_query(query, query_options=codegen_false) |
| exempt3 = self.execute_query(query, query_options=vector.get_value('exec_option')) |
| assert exempt1.success |
| assert exempt2.success |
| assert exempt1.data == exempt2.data |
| assert exempt1.data == exempt3.data |
| assertCounters(exempt1.runtime_profile, num_hits=0, num_halted=0, num_skipped=0) |
| assertCounters(exempt2.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) |
| assertCounters(exempt3.runtime_profile, num_hits=1, num_halted=0, num_skipped=0) |
| |
| def test_aggregate(self, vector, unique_database): |
| """Simple aggregation can be cached""" |
| self.client.set_configuration(vector.get_value('exec_option')) |
| fq_table = "{0}.agg".format(unique_database) |
| self.create_table(fq_table) |
| |
| result1 = self.execute_query("SELECT sum(age) FROM {0}".format(fq_table)) |
| result2 = self.execute_query("SELECT sum(age) FROM {0}".format(fq_table)) |
| |
| assert result1.success |
| assert result2.success |
| assert result1.data == result2.data |
| assertCounters(result1.runtime_profile, 0, 0, 0, num_matches=2) |
| # Aggregate should hit, and scan node below it will miss. |
| assertCounterOrder(result2.runtime_profile, NUM_HITS, [1, 0]) |
| assertCounter(result2.runtime_profile, NUM_HALTED, 0, num_matches=2) |
| assertCounter(result2.runtime_profile, NUM_SKIPPED, 0, num_matches=2) |
| # Verify that the bytes written by the first profile are the same as the bytes |
| # read by the second profile. |
| bytes_written = getCounterValues(result1.runtime_profile, "TupleCacheBytesWritten") |
| bytes_read = getCounterValues(result2.runtime_profile, "TupleCacheBytesRead") |
| assert len(bytes_written) == 2 |
| assert len(bytes_read) == 1 |
| assert bytes_written[0] == bytes_read[0] |
| |
| def test_aggregate_reuse(self, vector): |
| """Cached aggregation can be re-used""" |
| self.client.set_configuration(vector.get_value('exec_option')) |
| |
| result = self.execute_query("SELECT sum(int_col) FROM functional.alltypes") |
| assert result.success |
| assertCounters(result.runtime_profile, 0, 0, 0, num_matches=2) |
| |
| result_scan = self.execute_query("SELECT avg(int_col) FROM functional.alltypes") |
| assert result_scan.success |
| assertCounterOrder(result_scan.runtime_profile, NUM_HITS, [0, 1]) |
| |
| result_agg = self.execute_query( |
| "SELECT avg(a) FROM (SELECT sum(int_col) as a FROM functional.alltypes) b") |
| assert result_agg.success |
| assertCounterOrder(result_agg.runtime_profile, NUM_HITS, [1, 0]) |
| |
| def test_parquet_resolution_by_name(self, vector, unique_database): |
| """Verify that parquet_fallback_schema_resolution=NAME works with tuple caching""" |
| self.run_test_case('QueryTest/parquet-resolution-by-name', vector, |
| use_db=unique_database) |
| |
| def test_partition_information(self, vector): |
| """Verify that partition information is incorporated into the runtime cache key""" |
| self.client.set_configuration(vector.get_value('exec_option')) |
| |
| # scale_db.num_partitions_1234_blocks_per_partition_1 is an exotic table where all |
| # the partitions point to the same filesystem location. A single file is read many |
| # times for different partitions. It is not possible to tell the partitions apart |
| # by the file path, so this verifies that the partition information is being included |
| # properly. |
| query_template = \ |
| "select i, j from scale_db.num_partitions_1234_blocks_per_partition_1 where j={0}" |
| # Run against the j=1 partition |
| result1 = self.execute_query(query_template.format(1)) |
| assert result1.success |
| assertCounters(result1.runtime_profile, 0, 0, 0) |
| assert len(result1.data) == 1 |
| assert result1.data[0].split("\t") == ["1", "1"] |
| |
| # Run against the j=2 partition. There should not be a cache hit, because they are |
| # running against different partitions. This only works if the runtime key |
| # incorporates the partition information. |
| result2 = self.execute_query(query_template.format(2)) |
| assert result2.success |
| assertCounters(result2.runtime_profile, 0, 0, 0) |
| assert len(result2.data) == 1 |
| assert result2.data[0].split("\t") == ["1", "2"] |
| |
| def test_json_binary_format(self, vector, unique_database): |
| """This is identical to test_scanners.py's TestBinaryType::test_json_binary_format. |
| That test modifies a table's serde properties to change the json binary format. |
| The tuple cache detects that by including the partition's storage descriptor |
| information. This fails if that doesn't happen.""" |
| test_tbl = unique_database + '.binary_tbl' |
| self.clone_table('functional_json.binary_tbl', test_tbl, False, vector) |
| self.run_test_case('QueryTest/json-binary-format', vector, unique_database) |
| |
| def test_complex_types_verification(self, vector): |
| """Run with correctness verification and check that it works with a query that |
| selects complex types.""" |
| # We use custom query options to turn on verification. We also need to use |
| # expand_complex_types=true so that * includes columns with complex types |
| custom_options = dict(vector.get_value('exec_option')) |
| custom_options['enable_tuple_cache_verification'] = 'true' |
| custom_options['expand_complex_types'] = 'true' |
| |
| # functional_parquet.complextypestbl has multiple columns with different types |
| # of complex types. e.g. nested_struct is a struct with multiple nested fields. |
| query = "select * from functional_parquet.complextypestbl" |
| result1 = self.execute_query(query, query_options=custom_options) |
| assert result1.success |
| assertCounters(result1.runtime_profile, 0, 0, 0) |
| |
| # The second run is when correctness verification kicks in and tests the printing |
| # logic. |
| result2 = self.execute_query(query, query_options=custom_options) |
| assert result2.success |
| # The regular counters see this as skip |
| assertCounters(result2.runtime_profile, 0, 0, 1) |
| assertCounter(result2.runtime_profile, NUM_CORRECTNESS_VERIFICATION, 1, 1) |
| # Order by is currently not supported with complex types results, so sort the results |
| # before comparing them. |
| assert sorted(result1.data) == sorted(result2.data) |
| |
| |
| @CustomClusterTestSuite.with_args( |
| start_args=CACHE_START_ARGS, |
| impalad_args="--use_local_catalog=false", |
| catalogd_args="--catalog_topic_mode=full") |
| class TestTupleCacheCluster(TestTupleCacheBase): |
| """Tests Impala with 3 executors and mt_dop=1.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestTupleCacheCluster, cls).add_test_dimensions() |
| add_mandatory_exec_option(cls, 'mt_dop', 1) |
| |
| def test_runtime_filters(self, vector, unique_database): |
| """ |
| This tests that adding files to a table results in different runtime filter keys. |
| The last assertions after 'invaidate metadata' only meet if Impala cluster is in |
| legacy catalog mode. |
| """ |
| self.client.set_configuration(vector.get_value('exec_option')) |
| fq_table = "{0}.runtime_filters".format(unique_database) |
| # A query containing multiple runtime filters |
| # - scan of A receives runtime filters from B and C, so it depends on contents of B/C |
| # - scan of B receives runtime filter from C, so it depends on contents of C |
| query = "select straight_join a.id from functional.alltypes a, functional.alltypes" \ |
| " b, {0} c where a.id = b.id and a.id = c.age order by a.id".format(fq_table) |
| query_a_id = 10 |
| query_b_id = 11 |
| query_c_id = 12 |
| |
| # Create an empty table |
| self.create_table(fq_table, scale=0) |
| |
| # Establish a baseline |
| empty_result = self.execute_query(query) |
| empty_cache_keys = get_cache_keys(empty_result.runtime_profile) |
| # Tables a and b have multiple files, so they are distributed across all 3 nodes. |
| # Table c has one file, so it has a single entry. |
| assert len(empty_cache_keys) == 3 |
| assert len(empty_cache_keys[query_c_id]) == 1 |
| empty_c_compile_key, empty_c_finst_key = empty_cache_keys[query_c_id][0].split("_") |
| assert empty_c_finst_key == "0" |
| assert len(empty_result.data) == 0 |
| |
| # Insert a row, which creates a file / scan range |
| self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, table_value(0))) |
| |
| # Now, there is a scan range, so the fragment instance key should be non-zero. |
| one_file_result = self.execute_query(query) |
| one_cache_keys = get_cache_keys(one_file_result.runtime_profile) |
| assert len(one_cache_keys) == 3 |
| assert len(empty_cache_keys[query_c_id]) == 1 |
| one_c_compile_key, one_c_finst_key = one_cache_keys[query_c_id][0].split("_") |
| assert one_c_finst_key != "0" |
| # This should be a cache miss |
| assertCounters(one_file_result.runtime_profile, 0, 0, 0, 7) |
| assert len(one_file_result.data) == 1 |
| |
| # The new scan range did not change the compile-time key, but did change the runtime |
| # filter keys. |
| for id in [query_a_id, query_b_id]: |
| assert len(empty_cache_keys[id]) == len(one_cache_keys[id]) |
| for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]): |
| assert empty != one |
| assert empty_c_compile_key == one_c_compile_key |
| |
| # Insert another row, which creates a file / scan range |
| self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, table_value(1))) |
| |
| # There is a second scan range, so the fragment instance key should change again |
| two_files_result = self.execute_query(query) |
| two_cache_keys = get_cache_keys(two_files_result.runtime_profile) |
| assert len(two_cache_keys) == 3 |
| assert len(two_cache_keys[query_c_id]) == 2 |
| two_c1_compile_key, two_c1_finst_key = two_cache_keys[query_c_id][0].split("_") |
| two_c2_compile_key, two_c2_finst_key = two_cache_keys[query_c_id][1].split("_") |
| assert two_c1_finst_key != "0" |
| assert two_c2_finst_key != "0" |
| # There may be a cache hit for the prior "c" scan range (if scheduled to the same |
| # instance), and the rest cache misses. |
| assertCounter(two_files_result.runtime_profile, NUM_HITS, 0, num_matches=[7, 8]) |
| assertCounter(two_files_result.runtime_profile, NUM_HITS, 1, num_matches=[0, 1]) |
| assertCounter(two_files_result.runtime_profile, NUM_HALTED, 0, num_matches=8) |
| assertCounter(two_files_result.runtime_profile, NUM_SKIPPED, 0, num_matches=8) |
| assert len(two_files_result.data) == 2 |
| # Ordering can vary by environment. Ensure one matches and one differs. |
| assert one_c_finst_key == two_c1_finst_key or one_c_finst_key == two_c2_finst_key |
| assert one_c_finst_key != two_c1_finst_key or one_c_finst_key != two_c2_finst_key |
| overlapping_rows = set(one_file_result.data).intersection(set(two_files_result.data)) |
| assert len(overlapping_rows) == 1 |
| |
| # The new scan range did not change the compile-time key, but did change the runtime |
| # filter keys. |
| for id in [query_a_id, query_b_id]: |
| assert len(empty_cache_keys[id]) == len(one_cache_keys[id]) |
| for empty, one in zip(empty_cache_keys[id], one_cache_keys[id]): |
| assert empty != one |
| assert one_c_compile_key == two_c1_compile_key |
| assert one_c_compile_key == two_c2_compile_key |
| |
| # Invalidate metadata and rerun the last query. The keys should stay the same. |
| self.execute_query("invalidate metadata") |
| rerun_two_files_result = self.execute_query(query) |
| # Verify that this is a cache hit |
| assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0, num_matches=8) |
| rerun_cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile) |
| assert rerun_cache_keys == two_cache_keys |
| assert rerun_two_files_result.data == two_files_result.data |
| |
| def test_runtime_filter_reload(self, vector, unique_database): |
| """ |
| This tests that reloading files to a table results in matching runtime filter keys. |
| """ |
| self.client.set_configuration(vector.get_value('exec_option')) |
| fq_table = "{0}.runtime_filter_genspec".format(unique_database) |
| # Query where fq_table generates a runtime filter. |
| query = "select straight_join a.id from functional.alltypes a, {0} b " \ |
| "where a.id = b.age order by a.id".format(fq_table) |
| |
| # Create a partitioned table with 3 partitions |
| self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING) " |
| "PARTITIONED BY (age INT)".format(fq_table)) |
| self.execute_query( |
| "INSERT INTO {0} PARTITION(age=4) VALUES (\"Vanessa\")".format(fq_table)) |
| self.execute_query( |
| "INSERT INTO {0} PARTITION(age=5) VALUES (\"Carl\")".format(fq_table)) |
| self.execute_query( |
| "INSERT INTO {0} PARTITION(age=6) VALUES (\"Cleopatra\")".format(fq_table)) |
| |
| # Prime the cache |
| base_result = self.execute_query(query) |
| base_cache_keys = get_cache_keys(base_result.runtime_profile) |
| assert len(base_cache_keys) == 3 |
| |
| # Drop and reload the table |
| self.execute_query("DROP TABLE {0}".format(fq_table)) |
| self.execute_query("CREATE EXTERNAL TABLE {0} (name STRING, address STRING) " |
| "PARTITIONED BY (age INT)".format(fq_table)) |
| self.execute_query("ALTER TABLE {0} RECOVER PARTITIONS".format(fq_table)) |
| |
| # Verify we reuse the cache |
| reload_result = self.execute_query(query) |
| reload_cache_keys = get_cache_keys(reload_result.runtime_profile) |
| assert base_result.data == reload_result.data |
| assert base_cache_keys == reload_cache_keys |
| # Skips verifying cache hits as fragments may not be assigned to the same nodes. |
| |
| def test_join_modifications(self, vector, unique_database): |
| """ |
| This tests caching above a join without runtime filters and verifies that changes |
| to the build side table results in a different cache key. |
| """ |
| fq_table = "{0}.join_modifications".format(unique_database) |
| query = "select straight_join probe.id from functional.alltypes probe join " \ |
| "/* +broadcast */ {0} build on (probe.id = build.age) ".format(fq_table) + \ |
| "order by probe.id" |
| # Create an empty table |
| self.create_table(fq_table, scale=0) |
| probe_id = 6 |
| build_id = 7 |
| above_join_id = 8 |
| |
| # Run without runtime filters to verify the regular path works |
| no_runtime_filters = dict(vector.get_value('exec_option')) |
| no_runtime_filters['runtime_filter_mode'] = 'off' |
| |
| # Establish a baseline |
| empty_result = self.execute_query(query, no_runtime_filters) |
| empty_cache_keys = get_cache_keys(empty_result.runtime_profile) |
| # The build side is on one node. The probe side is on three nodes. |
| assert len(empty_cache_keys) == 3 |
| assert len(empty_cache_keys[probe_id]) == 3 |
| assert len(empty_cache_keys[build_id]) == 1 |
| assert len(empty_cache_keys[above_join_id]) == 3 |
| empty_build_key = empty_cache_keys[build_id][0] |
| empty_build_compile_key, empty_build_finst_key = empty_build_key.split("_") |
| assert empty_build_finst_key == "0" |
| assert len(empty_result.data) == 0 |
| empty_join_compile_key = empty_cache_keys[above_join_id][0].split("_")[0] |
| |
| # Insert a row, which creates a file / scan range |
| self.execute_query("INSERT INTO {0} VALUES ({1})".format(fq_table, table_value(0))) |
| |
| # There is a build-side scan range, so the fragment instance key should be non-zero. |
| one_file_result = self.execute_query(query, no_runtime_filters) |
| assert len(one_file_result.data) == 1 |
| one_cache_keys = get_cache_keys(one_file_result.runtime_profile) |
| assert len(one_cache_keys) == 3 |
| assert len(one_cache_keys[probe_id]) == 3 |
| assert len(one_cache_keys[build_id]) == 1 |
| assert len(one_cache_keys[above_join_id]) == 3 |
| one_build_key = one_cache_keys[build_id][0] |
| one_build_compile_key, one_build_finst_key = one_build_key.split("_") |
| assert one_build_finst_key != "0" |
| assert one_build_compile_key == empty_build_compile_key |
| # This should be a cache miss for the build side and above the join, but a cache |
| # hit for the probe side (3 instances). |
| assertCounter(one_file_result.runtime_profile, NUM_HITS, 1, 3) |
| assertCounter(one_file_result.runtime_profile, NUM_HALTED, 0, 7) |
| assertCounter(one_file_result.runtime_profile, NUM_SKIPPED, 0, 7) |
| # The above join compile time key should have changed, because it incorporates the |
| # build side scan ranges. |
| one_join_compile_key = one_cache_keys[above_join_id][0].split("_")[0] |
| assert one_join_compile_key != empty_join_compile_key |
| |
| def test_join_timing(self, vector): |
| """ |
| This verifies that a very short query with a cache hit above a join can complete |
| below a certain threshold. This should be sensitive to issues with synchronization |
| with the shared join builder. |
| """ |
| query = "select straight_join probe.id from functional.alltypes probe join " \ |
| "/* +broadcast */ functional.alltypes build on (probe.id = build.id) " \ |
| "order by probe.id" |
| |
| # To avoid interaction with cache entries from previous tests, set an unrelated |
| # query option to keep the key different. |
| custom_options = dict(vector.get_value('exec_option')) |
| custom_options['batch_size'] = '1234' |
| |
| first_run_result = self.execute_query(query, custom_options) |
| assert len(first_run_result.data) == 7300 |
| assertCounter(first_run_result.runtime_profile, NUM_HITS, 0, 9) |
| assertCounter(first_run_result.runtime_profile, NUM_HALTED, 0, 9) |
| assertCounter(first_run_result.runtime_profile, NUM_SKIPPED, 0, 9) |
| start_time = time.time() |
| second_run_result = self.execute_query(query, custom_options) |
| end_time = time.time() |
| # The location above the join hits and the location on the build side hits, |
| # but the probe location is below the join and doesn't hit. |
| assertCounter(second_run_result.runtime_profile, NUM_HITS, 1, 6) |
| assertCounter(second_run_result.runtime_profile, NUM_HALTED, 0, 9) |
| assertCounter(second_run_result.runtime_profile, NUM_SKIPPED, 0, 9) |
| # As a sanity check for the synchronization pieces, verify that this runs in less |
| # than 750 milliseconds. |
| assert end_time - start_time < 0.75 |
| |
| |
| @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1) |
| class TestTupleCacheRuntimeKeysBasic(TestTupleCacheBase): |
| """Simpler tests that run on a single node with mt_dop=0 or mt_dop=1.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestTupleCacheRuntimeKeysBasic, cls).add_test_dimensions() |
| add_exec_option_dimension(cls, 'mt_dop', [0, 1]) |
| |
| def test_scan_range_basics(self, vector, unique_database): |
| """ |
| This tests that adding/removing files to a table results in different keys. |
| """ |
| self.client.set_configuration(vector.get_value('exec_option')) |
| fq_table = "{0}.scan_range_basics".format(unique_database) |
| query = "SELECT * from {0}".format(fq_table) |
| |
| # Create an empty table |
| self.create_table(fq_table, scale=0) |
| |
| # When there are no scan ranges, then fragment instance key is 0. This is |
| # somewhat of a toy case and we probably want to avoid caching in this |
| # case. Nonetheless, it is a good sanity check. |
| empty_result = self.execute_query(query) |
| cache_keys = get_cache_keys(empty_result.runtime_profile) |
| assert len(cache_keys) == 1 |
| empty_table_compile_key, empty_table_finst_key = cache_keys[0].split("_") |
| assert empty_table_finst_key == "0" |
| assert len(empty_result.data) == 0 |
| assert_deterministic_scan(vector, empty_result.runtime_profile) |
| |
| # Insert a row, which creates a file / scan range |
| self.execute_query("INSERT INTO {0} VALUES ({1})".format( |
| fq_table, table_value(0))) |
| |
| # Now, there is a scan range, so the fragment instance key should be non-zero. |
| one_file_result = self.execute_query(query) |
| cache_keys = get_cache_keys(one_file_result.runtime_profile) |
| assert len(cache_keys) == 1 |
| one_file_compile_key, one_file_finst_key = cache_keys[0].split("_") |
| assert one_file_finst_key != "0" |
| # This should be a cache miss |
| assertCounters(one_file_result.runtime_profile, 0, 0, 0) |
| assert len(one_file_result.data) == 1 |
| assert_deterministic_scan(vector, one_file_result.runtime_profile) |
| |
| # The new scan range did not change the compile-time key |
| assert empty_table_compile_key == one_file_compile_key |
| |
| # Insert another row, which creates a file / scan range |
| self.execute_query("INSERT INTO {0} VALUES ({1})".format( |
| fq_table, table_value(1))) |
| |
| # There is a second scan range, so the fragment instance key should change again |
| two_files_result = self.execute_query(query) |
| cache_keys = get_cache_keys(two_files_result.runtime_profile) |
| assert len(cache_keys) == 1 |
| two_files_compile_key, two_files_finst_key = cache_keys[0].split("_") |
| assert two_files_finst_key != "0" |
| assertCounters(two_files_result.runtime_profile, 0, 0, 0) |
| assert len(two_files_result.data) == 2 |
| assert one_file_finst_key != two_files_finst_key |
| overlapping_rows = set(one_file_result.data).intersection(set(two_files_result.data)) |
| assert len(overlapping_rows) == 1 |
| assert_deterministic_scan(vector, two_files_result.runtime_profile) |
| |
| # The new scan range did not change the compile-time key |
| assert one_file_compile_key == two_files_compile_key |
| |
| # Invalidate metadata and rerun the last query. The keys should stay the same. |
| self.execute_query("invalidate metadata") |
| rerun_two_files_result = self.execute_query(query) |
| # Verify that this is a cache hit |
| assertCounters(rerun_two_files_result.runtime_profile, 1, 0, 0) |
| cache_keys = get_cache_keys(rerun_two_files_result.runtime_profile) |
| assert len(cache_keys) == 1 |
| rerun_two_files_compile_key, rerun_two_files_finst_key = cache_keys[0].split("_") |
| assert rerun_two_files_finst_key == two_files_finst_key |
| assert rerun_two_files_compile_key == two_files_compile_key |
| assert rerun_two_files_result.data == two_files_result.data |
| |
| def test_scan_range_partitioned(self, vector): |
| """ |
| This tests a basic partitioned case where the query is identical except that |
| it operates on different partitions (and thus different scan ranges). |
| """ |
| self.client.set_configuration(vector.get_value('exec_option')) |
| year2009_result = self.execute_query( |
| "select * from functional.alltypes where year=2009") |
| cache_keys = get_cache_keys(year2009_result.runtime_profile) |
| assert len(cache_keys) == 1 |
| year2009_compile_key, year2009_finst_key = cache_keys[0].split("_") |
| |
| year2010_result = self.execute_query( |
| "select * from functional.alltypes where year=2010") |
| cache_keys = get_cache_keys(year2010_result.runtime_profile) |
| assert len(cache_keys) == 1 |
| year2010_compile_key, year2010_finst_key = cache_keys[0].split("_") |
| # This should be a cache miss |
| assertCounters(year2010_result.runtime_profile, 0, 0, 0) |
| |
| # The year=X predicate is on a partition column, so it is enforced by pruning |
| # partitions and doesn't carry through to execution. The compile keys for |
| # the two queries are the same, but the fragment instance keys are different due |
| # to the different scan ranges from different partitions. |
| assert year2009_compile_key == year2010_compile_key |
| assert year2009_finst_key != year2010_finst_key |
| # Verify that the results are completely different |
| year2009_result_set = set(year2009_result.data) |
| year2010_result_set = set(year2010_result.data) |
| overlapping_rows = year2009_result_set.intersection(year2010_result_set) |
| assert len(overlapping_rows) == 0 |
| assert year2009_result.data[0].find("2009") != -1 |
| assert year2009_result.data[0].find("2010") == -1 |
| assert year2010_result.data[0].find("2010") != -1 |
| assert year2010_result.data[0].find("2009") == -1 |
| |
| |
| @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS) |
| class TestTupleCacheFullCluster(TestTupleCacheBase): |
| """Test with 3 executors and a range of mt_dop values.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestTupleCacheFullCluster, cls).add_test_dimensions() |
| add_exec_option_dimension(cls, 'mt_dop', [0, 1, 2]) |
| |
| def test_scan_range_distributed(self, vector, unique_database): |
| """ |
| This tests the distributed case where there are multiple fragment instances |
| processing different scan ranges. Each fragment instance should have a |
| distinct cache key. When adding a scan range, at least one fragment instance |
| cache key should change. |
| """ |
| self.client.set_configuration(vector.get_value('exec_option')) |
| mt_dop = vector.get_value('exec_option')['mt_dop'] |
| fq_table = "{0}.scan_range_distributed".format(unique_database) |
| query = "SELECT * from {0}".format(fq_table) |
| |
| entries_baseline = { |
| impalad: self.get_tuple_cache_metric(impalad.service, "entries-in-use") |
| for impalad in self.cluster.impalads} |
| |
| # Create a table with several files so that we always have enough work for multiple |
| # fragment instances |
| self.create_table(fq_table, scale=20) |
| |
| # We run a simple select. This is running with multiple impalads, so there are |
| # always multiple fragment instances |
| before_result = self.execute_query(query) |
| cache_keys = get_cache_keys(before_result.runtime_profile) |
| expected_num_keys = 3 * max(mt_dop, 1) |
| assert len(cache_keys) == expected_num_keys |
| # Every cache key should be distinct, as the fragment instances are processing |
| # different data |
| unique_cache_keys = set(cache_keys) |
| assert len(unique_cache_keys) == expected_num_keys |
| # Every cache key has the same compile key |
| unique_compile_keys = set([key.split("_")[0] for key in unique_cache_keys]) |
| assert len(unique_compile_keys) == 1 |
| # Verify the cache metrics for each impalad. Determine number of new cache entries, |
| # which should be the same as the number of cache keys. |
| for impalad in self.cluster.impalads: |
| entries_in_use = self.get_tuple_cache_metric(impalad.service, "entries-in-use") |
| assert entries_in_use - entries_baseline[impalad] == max(mt_dop, 1) |
| assert_deterministic_scan(vector, before_result.runtime_profile) |
| |
| entries_before_insert = { |
| impalad: self.get_tuple_cache_metric(impalad.service, "entries-in-use") |
| for impalad in self.cluster.impalads} |
| |
| # Some modification times have coarse granularity (e.g. a second). If the insert runs |
| # too quickly, the new file could have the same modification time as an existing |
| # file. In that case, the sort may not place it last, causing unexpected changes to |
| # the cache keys. Sleep a bit to guarantee a newer modification time. |
| time.sleep(3) |
| |
| # Insert another row, which creates a file / scan range |
| # This uses a very large seed for table_value() to get a unique row that isn't |
| # already in the table. |
| self.execute_query("INSERT INTO {0} VALUES ({1})".format( |
| fq_table, table_value(1000000))) |
| |
| # Rerun the query with the extra scan range |
| after_insert_result = self.execute_query(query) |
| cache_keys = get_cache_keys(after_insert_result.runtime_profile) |
| expected_num_keys = 3 * max(mt_dop, 1) |
| assert len(cache_keys) == expected_num_keys |
| # Every cache key should be distinct, as the fragment instances are processing |
| # different data |
| after_insert_unique_cache_keys = set(cache_keys) |
| assert len(after_insert_unique_cache_keys) == expected_num_keys |
| # Every cache key has the same compile key |
| unique_compile_keys = \ |
| set([key.split("_")[0] for key in after_insert_unique_cache_keys]) |
| assert len(unique_compile_keys) == 1 |
| |
| # Verify the cache metrics. Scheduling scan ranges from oldest to newest makes this |
| # deterministic. The new file will be scheduled last and will change exactly one |
| # cache key. |
| assert len(after_insert_unique_cache_keys - unique_cache_keys) == 1 |
| total_new_entries = 0 |
| for impalad in self.cluster.impalads: |
| new_entries_in_use = self.get_tuple_cache_metric(impalad.service, "entries-in-use") |
| new_entries_in_use -= entries_before_insert[impalad] |
| # We're comparing with before the insert, so one node will have a new entry and all |
| # others will be the same. |
| assert new_entries_in_use in [0, 1] |
| total_new_entries += new_entries_in_use |
| assert total_new_entries == 1 |
| assert_deterministic_scan(vector, after_insert_result.runtime_profile) |
| |
| # The extra scan range means that at least one fragment instance key changed |
| # Since scheduling can change completely with the addition of a single scan range, |
| # we can't assert that only one cache key changes. |
| changed_cache_keys = unique_cache_keys.symmetric_difference( |
| after_insert_unique_cache_keys) |
| assert len(changed_cache_keys) != 0 |
| |
| # Each row is distinct, so that makes it easy to verify that the results overlap |
| # except the second result contains one more row than the first result. |
| before_result_set = set(before_result.data) |
| after_insert_result_set = set(after_insert_result.data) |
| assert len(before_result_set) == 70 * 20 |
| assert len(before_result_set) + 1 == len(after_insert_result_set) |
| different_rows = before_result_set.symmetric_difference(after_insert_result_set) |
| assert len(different_rows) == 1 |
| |
| @SkipIfDockerizedCluster.internal_hostname |
| @SkipIf.hardcoded_uris |
| def test_iceberg_deletes(self, vector): # noqa: U100 |
| """ |
| Test basic Iceberg v2 deletes, which relies on the directed mode and looking |
| past TupleCacheNodes to find the scan nodes. |
| """ |
| |
| # This query tests both equality deletes and positional deletes. |
| query = "select * from functional_parquet.iceberg_v2_delete_both_eq_and_pos " + \ |
| "order by i" |
| result1 = self.execute_query(query) |
| result2 = self.execute_query(query) |
| assert result1.success and result2.success |
| |
| assert result1.data == result2.data |
| assert result1.data[0].split("\t") == ["2", "str2_updated", "2023-12-13"] |
| assert result1.data[1].split("\t") == ["3", "str3", "2023-12-23"] |
| |
| |
| @CustomClusterTestSuite.with_args(start_args=CACHE_START_ARGS, cluster_size=1) |
| class TestTupleCacheMtdop(TestTupleCacheBase): |
| """Test with single executor and mt_dop=0 or 2.""" |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestTupleCacheMtdop, cls).add_test_dimensions() |
| add_exec_option_dimension(cls, 'mt_dop', [0, 2]) |
| |
| def test_tuple_cache_count_star(self, vector, unique_database): |
| """ |
| This test is a regression test for IMPALA-13411 to see whether it hits |
| the DCHECK. |
| """ |
| self.client.set_configuration(vector.get_value('exec_option')) |
| fq_table = "{0}.tuple_cache_count_star".format(unique_database) |
| |
| # Create a table. |
| self.create_table(fq_table, scale=1) |
| |
| # Run twice and see if it hits the DCHECK. |
| query = "select count(*) from {0}".format(fq_table) |
| result1 = self.execute_query(query) |
| result2 = self.execute_query(query) |
| assert result1.success and result2.success |
| |
| def test_tuple_cache_key_with_stats(self, vector, unique_database): |
| """ |
| This test verifies if compute stats affect the tuple cache key. |
| """ |
| self.client.set_configuration(vector.get_value('exec_option')) |
| fq_table = "{0}.tuple_cache_stats_test".format(unique_database) |
| |
| # Create a table. |
| self.create_table(fq_table, scale=1) |
| |
| # Get the explain text for a simple query. |
| query = "explain select * from {0}".format(fq_table) |
| result1 = self.execute_query(query) |
| |
| # Insert rows to make the stats different. |
| for i in range(10): |
| self.execute_query("INSERT INTO {0} VALUES ({1})".format( |
| fq_table, table_value(i))) |
| |
| # Run compute stats and get the explain text again for the same query. |
| self.client.execute("COMPUTE STATS {0}".format(fq_table)) |
| result2 = self.execute_query(query) |
| |
| # Verify memory estimations are different, while the cache keys are identical. |
| assert result1.success and result2.success |
| mem_limit1, units1 = match_memory_estimate(result1.data) |
| mem_limit1 = parse_mem_to_mb(mem_limit1, units1) |
| mem_limit2, units2 = match_memory_estimate(result2.data) |
| mem_limit2 = parse_mem_to_mb(mem_limit2, units2) |
| assert mem_limit1 != mem_limit2 |
| cache_key1 = match_cache_key(result1.data) |
| cache_key2 = match_cache_key(result2.data) |
| assert cache_key1 is not None and cache_key1 == cache_key2 |