blob: 6c7c41fba5aa6e4fa2f4eb12a12440c82da81d39 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import pytest
import re
import time
from beeswaxd.BeeswaxService import QueryState
from tests.common.environ import build_flavor_timeout
from tests.common.impala_cluster import ImpalaCluster
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.common.skip import SkipIfLocal, SkipIfIsilon
from tests.verifiers.metric_verifier import MetricVerifier
# slow_build_timeout is set to 200000 to avoid failures like IMPALA-8064 where the
# runtime filters don't arrive in time.
WAIT_TIME_MS = build_flavor_timeout(60000, slow_build_timeout=200000)
# Some of the queries in runtime_filters consume a lot of memory, leading to
# significant memory reservations in parallel tests.
# Skipping Isilon due to IMPALA-6998. TODO: Remove when there's a holistic revamp of
# what tests to run for non-HDFS platforms
@pytest.mark.execute_serially
@SkipIfLocal.multiple_impalad
@SkipIfIsilon.jira(reason="IMPALA-6998")
class TestRuntimeFilters(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestRuntimeFilters, cls).add_test_dimensions()
# Runtime filters are disabled on HBase
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format not in ['hbase'])
def test_basic_filters(self, vector):
self.run_test_case('QueryTest/runtime_filters', vector,
test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS' : str(WAIT_TIME_MS)})
def test_wait_time(self, vector):
"""Test that a query that has global filters does not wait for them if run in LOCAL
mode"""
now = time.time()
self.run_test_case('QueryTest/runtime_filters_wait', vector)
duration_s = time.time() - now
assert duration_s < (WAIT_TIME_MS / 1000), \
"Query took too long (%ss, possibly waiting for missing filters?)" \
% str(duration_s)
@pytest.mark.execute_serially
def test_wait_time_cancellation(self, vector):
"""Regression test for IMPALA-9065 to ensure that threads waiting for filters
get woken up and exit promptly when the query is cancelled."""
# Make sure the cluster is quiesced before we start this test
self._verify_no_fragments_running()
self.change_database(self.client, vector.get_value('table_format'))
# Set up a query where a scan (plan node 0, scanning alltypes) will wait
# indefinitely for a filter to arrive. The filter arrival is delayed
# by adding a wait to the scan of alltypestime (plan node 0).
QUERY = """select straight_join *
from alltypes t1
join /*+shuffle*/ alltypestiny t2 on t1.id = t2.id"""
self.client.set_configuration_option("DEBUG_ACTION", "1:OPEN:WAIT")
self.client.set_configuration_option("RUNTIME_FILTER_WAIT_TIME_MS", "10000000")
# Run same query with different delays to better exercise call paths.
for delay_s in [0, 1, 2]:
handle = self.client.execute_async(QUERY)
self.wait_for_state(handle, QueryState.RUNNING, 10)
time.sleep(delay_s) # Give the query time to get blocked waiting for the filter.
self.client.close_query(handle)
# Ensure that cancellation has succeeded and the cluster has quiesced.
self._verify_no_fragments_running()
def _verify_no_fragments_running(self):
"""Raise an exception if there are fragments running on the cluster after a
timeout."""
for impalad in ImpalaCluster.get_e2e_test_cluster().impalads:
verifier = MetricVerifier(impalad.service)
verifier.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=10)
verifier.wait_for_backend_admission_control_state(timeout=10)
def test_file_filtering(self, vector):
if 'kudu' in str(vector.get_value('table_format')):
return
self.change_database(self.client, vector.get_value('table_format'))
self.execute_query("SET RUNTIME_FILTER_MODE=GLOBAL")
self.execute_query("SET RUNTIME_FILTER_WAIT_TIME_MS=10000")
result = self.execute_query("""select STRAIGHT_JOIN * from alltypes inner join
(select * from alltypessmall where smallint_col=-1) v
on v.year = alltypes.year""")
assert re.search("Files rejected: 8 \(8\)", result.runtime_profile) is not None
assert re.search("Splits rejected: [^0] \([^0]\)", result.runtime_profile) is None
@SkipIfLocal.multiple_impalad
class TestBloomFilters(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestBloomFilters, cls).add_test_dimensions()
# Bloom filters are disabled on HBase, Kudu
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format not in ['hbase', 'kudu'])
def test_bloom_filters(self, vector):
self.run_test_case('QueryTest/bloom_filters', vector)
def test_bloom_wait_time(self, vector):
"""Test that a query that has global filters does not wait for them if run in LOCAL
mode"""
now = time.time()
self.run_test_case('QueryTest/bloom_filters_wait', vector)
duration_s = time.time() - now
assert duration_s < (WAIT_TIME_MS / 1000), \
"Query took too long (%ss, possibly waiting for missing filters?)" \
% str(duration_s)
@SkipIfLocal.multiple_impalad
class TestMinMaxFilters(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestMinMaxFilters, cls).add_test_dimensions()
# Min-max filters are only implemented for Kudu.
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('table_format').file_format in ['kudu'])
def test_min_max_filters(self, vector):
self.run_test_case('QueryTest/min_max_filters', vector,
test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
def test_decimal_min_max_filters(self, vector):
if self.exploration_strategy() != 'exhaustive':
pytest.skip("skip decimal min max filter test with various joins")
self.run_test_case('QueryTest/decimal_min_max_filters', vector,
test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS': str(WAIT_TIME_MS)})
def test_large_strings(self, cursor, unique_database):
"""Tests that truncation of large strings by min-max filters still gives correct
results"""
table1 = "%s.min_max_filter_large_strings1" % unique_database
cursor.execute(
"create table %s (string_col string primary key) stored as kudu" % table1)
# Min-max bounds are truncated at 1024 characters, so construct some strings that are
# longer than that, as well as some that are very close to the min/max bounds.
matching_vals =\
('b' * 1100, 'b' * 1099 + 'c', 'd' * 1100, 'f'* 1099 + 'e', 'f' * 1100)
cursor.execute("insert into %s values ('%s'), ('%s'), ('%s'), ('%s'), ('%s')"
% ((table1,) + matching_vals))
non_matching_vals = ('b' * 1099 + 'a', 'c', 'f' * 1099 + 'g')
cursor.execute("insert into %s values ('%s'), ('%s'), ('%s')"
% ((table1,) + non_matching_vals))
table2 = "%s.min_max_filter_large_strings2" % unique_database
cursor.execute(
"create table %s (string_col string primary key) stored as kudu" % table2)
cursor.execute("insert into %s values ('%s'), ('%s'), ('%s'), ('%s'), ('%s')"
% ((table2,) + matching_vals))
cursor.execute("select count(*) from %s a, %s b where a.string_col = b.string_col"
% (table1, table2))
assert cursor.fetchall() == [(len(matching_vals),)]
# Insert a string that will have the max char (255) trailing after truncation, to
# test the path where adding 1 to the max bound after trunc overflows.
max_trail_str = "concat(repeat('h', 1000), repeat(chr(255), 50))"
cursor.execute("insert into %s values (%s)" % (table1, max_trail_str))
cursor.execute("insert into %s values (%s)" % (table2, max_trail_str))
cursor.execute("select count(*) from %s a, %s b where a.string_col = b.string_col"
% (table1, table2))
assert cursor.fetchall() == [(len(matching_vals) + 1,)]
# Insert a string that is entirely the max char to test the path where the max can't
# have 1 added to it after truncation and the filter is disabled.
all_max_str = "repeat(chr(255), 1030)"
cursor.execute("insert into %s values (%s)" % (table1, all_max_str))
cursor.execute("insert into %s values (%s)" % (table2, all_max_str))
cursor.execute("select count(*) from %s a, %s b where a.string_col = b.string_col"
% (table1, table2))
assert cursor.fetchall() == [(len(matching_vals) + 2,)]
@SkipIfLocal.multiple_impalad
class TestRuntimeRowFilters(ImpalaTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def add_test_dimensions(cls):
super(TestRuntimeRowFilters, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v:
v.get_value('table_format').file_format in ['parquet'])
def test_row_filters(self, vector):
self.run_test_case('QueryTest/runtime_row_filters', vector,
test_file_vars={'$RUNTIME_FILTER_WAIT_TIME_MS' : str(WAIT_TIME_MS)})