blob: e82b86472646ff05a30bf2c1f74d235e70a33132 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Tests query cancellation using the ImpalaService.Cancel API
#
import pytest
import threading
from time import sleep
from RuntimeProfile.ttypes import TRuntimeProfileFormat
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.test_vector import ImpalaTestDimension
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.util.cancel_util import cancel_query_and_validate_state
from tests.verifiers.metric_verifier import MetricVerifier
# PRIMARY KEY for lineitem
LINEITEM_PK = 'l_orderkey, l_partkey, l_suppkey, l_linenumber'
# Queries to execute, mapped to a unique PRIMARY KEY for use in CTAS with Kudu. If None
# is specified for the PRIMARY KEY, it will not be used in a CTAS statement on Kudu.
# Use the TPC-H dataset because tables are large so queries take some time to execute.
QUERIES = {'select l_returnflag from lineitem' : None,
'select count(l_returnflag) pk from lineitem' : 'pk',
'select * from lineitem limit 50' : LINEITEM_PK,
'compute stats lineitem' : None,
'select * from lineitem order by l_orderkey' : LINEITEM_PK}
QUERY_TYPE = ["SELECT", "CTAS"]
# Time to sleep between issuing query and canceling. Favor small times since races
# are prone to occur more often when the time between RPCs is small.
CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4]
# Number of times to execute/cancel each query under test
NUM_CANCELATION_ITERATIONS = 1
# Test cancellation on both running and hung queries. Node ID 0 is the scan node
WAIT_ACTIONS = [None, '0:GETNEXT:WAIT']
# Verify that failed CancelFInstances() RPCs don't lead to hung queries
FAIL_RPC_ACTIONS = [None, 'COORD_CANCEL_QUERY_FINSTANCES_RPC:FAIL']
# Test cancelling when there is a resource limit.
CPU_LIMIT_S = [0, 100000]
# Verify close rpc running concurrently with fetch rpc. The two cases verify:
# False: close and fetch rpc run concurrently.
# True: cancel rpc is enough to ensure that the fetch rpc is unblocked.
JOIN_BEFORE_CLOSE = [False, True]
# Extra dimensions to test order by without limit
SORT_QUERY = 'select * from lineitem order by l_orderkey'
SORT_CANCEL_DELAY = range(6, 10)
SORT_BUFFER_POOL_LIMIT = ['0', '300m'] # Test spilling and non-spilling sorts.
class TestCancellation(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(TestCancellation, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('query', *QUERIES.keys()))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('query_type', *QUERY_TYPE))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('cancel_delay', *CANCEL_DELAY_IN_SECONDS))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('wait_action', *WAIT_ACTIONS))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('fail_rpc_action', *FAIL_RPC_ACTIONS))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('join_before_close', *JOIN_BEFORE_CLOSE))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('buffer_pool_limit', 0))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('cpu_limit_s', *CPU_LIMIT_S))
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('query_type') != 'CTAS' or (\
v.get_value('table_format').file_format in ['text', 'parquet', 'kudu'] and\
v.get_value('table_format').compression_codec == 'none'))
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('exec_option')['batch_size'] == 0)
# Ignore 'compute stats' queries for the CTAS query type.
cls.ImpalaTestMatrix.add_constraint(
lambda v: not (v.get_value('query_type') == 'CTAS' and
v.get_value('query').startswith('compute stats')))
# Ignore CTAS on Kudu if there is no PRIMARY KEY specified.
cls.ImpalaTestMatrix.add_constraint(
lambda v: not (v.get_value('query_type') == 'CTAS' and
v.get_value('table_format').file_format == 'kudu' and
QUERIES[v.get_value('query')] is None))
# tpch tables are not generated for hbase as the data loading takes a very long time.
# TODO: Add cancellation tests for hbase.
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format != 'hbase')
if cls.exploration_strategy() != 'core':
NUM_CANCELATION_ITERATIONS = 3
def cleanup_test_table(self, table_format):
self.execute_query("drop table if exists ctas_cancel", table_format=table_format)
def execute_cancel_test(self, vector):
query = vector.get_value('query')
query_type = vector.get_value('query_type')
if query_type == "CTAS":
self.cleanup_test_table(vector.get_value('table_format'))
file_format = vector.get_value('table_format').file_format
if file_format == 'kudu':
assert QUERIES.has_key(query) and QUERIES[query] is not None,\
"PRIMARY KEY for query %s not specified" % query
query = "create table ctas_cancel primary key (%s) "\
"partition by hash partitions 3 stored as kudu as %s" %\
(QUERIES[query], query)
else:
query = "create table ctas_cancel stored as %sfile as %s" %\
(file_format, query)
wait_action = vector.get_value('wait_action')
fail_rpc_action = vector.get_value('fail_rpc_action')
debug_action = "|".join(filter(None, [wait_action, fail_rpc_action]))
vector.get_value('exec_option')['debug_action'] = debug_action
vector.get_value('exec_option')['buffer_pool_limit'] =\
vector.get_value('buffer_pool_limit')
vector.get_value('exec_option')['cpu_limit_s'] = vector.get_value('cpu_limit_s')
# Execute the query multiple times, cancelling it each time.
for i in xrange(NUM_CANCELATION_ITERATIONS):
cancel_query_and_validate_state(self.client, query,
vector.get_value('exec_option'), vector.get_value('table_format'),
vector.get_value('cancel_delay'), vector.get_value('join_before_close'))
if query_type == "CTAS":
self.cleanup_test_table(vector.get_value('table_format'))
# Executing the same query without canceling should work fine. Only do this if the
# query has a limit or aggregation
if not debug_action and ('count' in query or 'limit' in query):
self.execute_query(query, vector.get_value('exec_option'))
@pytest.mark.execute_serially
def test_misformatted_profile_text(self):
"""Tests that canceled queries have no whitespace formatting errors in their profiles
(IMPALA-2063). Executes serially because it is timing-dependent and can be flaky."""
query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)"
client = self.hs2_client
# Start query
handle = client.execute_async(query)
# Wait for the query to start (with a long timeout to account for admission control
# queuing).
WAIT_SECONDS = 60 * 30
assert any(client.get_state(handle) == 'RUNNING_STATE' or sleep(0.1)
for _ in range(10 * WAIT_SECONDS)), 'Query failed to start'
client.cancel(handle)
# Wait up to 5 seconds for the query to get cancelled
# TODO(IMPALA-1262): This should be CANCELED_STATE
# TODO(IMPALA-8411): Remove and assert that the query is cancelled immediately
assert any(client.get_state(handle) == 'ERROR_STATE' or sleep(1)
for _ in range(5)), 'Query failed to cancel'
# Get profile and check for formatting errors
profile = client.get_runtime_profile(handle, TRuntimeProfileFormat.THRIFT)
for (k, v) in profile.nodes[1].info_strings.iteritems():
# Ensure that whitespace gets removed from values.
assert v == v.rstrip(), \
"Profile value contains surrounding whitespace: %s %s" % (k, v)
# Plan text may be strangely formatted.
assert k == 'Plan' or '\n\n' not in v, \
"Profile contains repeating newlines: %s %s" % (k, v)
def teardown_method(self, method):
# For some reason it takes a little while for the query to get completely torn down
# when the debug action is WAIT, causing TestValidateMetrics.test_metrics_are_zero to
# fail. Introducing a small delay allows everything to quiesce.
# TODO: Figure out a better way to address this
sleep(1)
class TestCancellationParallel(TestCancellation):
@classmethod
def add_test_dimensions(cls):
super(TestCancellationParallel, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('query_type') != 'CTAS')
def test_cancel_select(self, vector):
self.execute_cancel_test(vector)
class TestCancellationSerial(TestCancellation):
@classmethod
def add_test_dimensions(cls):
super(TestCancellationSerial, cls).add_test_dimensions()
# Only run the insert tests in this suite - they need to be serial to allow us to
# check for file handle leaks.
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('query_type') == 'CTAS')
# This test suite is slow because it executes serially. Restrict some of the params
# that are not interesting for inserts.
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('cpu_limit_s') == CPU_LIMIT_S[0])
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('join_before_close') == JOIN_BEFORE_CLOSE[0])
if cls.exploration_strategy() != 'exhaustive':
# Only run a single 'cancel_delay' option in core.
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('cancel_delay') == CANCEL_DELAY_IN_SECONDS[3])
else:
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('cancel_delay') != CANCEL_DELAY_IN_SECONDS[0])
@pytest.mark.execute_serially
def test_cancel_insert(self, vector):
self.execute_cancel_test(vector)
metric_verifier = MetricVerifier(self.impalad_test_service)
metric_verifier.verify_no_open_files(timeout=10)
class TestCancellationFullSort(TestCancellation):
@classmethod
def add_test_dimensions(cls):
super(TestCancellationFullSort, cls).add_test_dimensions()
# Override dimensions to only execute the order-by without limit query.
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('query', SORT_QUERY))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('query_type', 'SELECT'))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('cancel_delay', *SORT_CANCEL_DELAY))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('buffer_pool_limit', *SORT_BUFFER_POOL_LIMIT))
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('fail_rpc_action') == FAIL_RPC_ACTIONS[0])
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format =='parquet' and\
v.get_value('table_format').compression_codec == 'none')
def test_cancel_sort(self, vector):
self.execute_cancel_test(vector)