blob: 91be5d456b42dac025c18c40c67ae4ae41343762 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Tests query cancellation using the ImpalaService.Cancel API
#
import pytest
import threading
from time import sleep
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.test_vector import ImpalaTestDimension
from tests.common.impala_test_suite import ImpalaTestSuite
from tests.verifiers.metric_verifier import MetricVerifier
# PRIMARY KEY for lineitem
LINEITEM_PK = 'l_orderkey, l_partkey, l_suppkey, l_linenumber'
# Queries to execute, mapped to a unique PRIMARY KEY for use in CTAS with Kudu. If None
# is specified for the PRIMARY KEY, it will not be used in a CTAS statement on Kudu.
# Use the TPC-H dataset because tables are large so queries take some time to execute.
QUERIES = {'select l_returnflag from lineitem' : None,
'select count(l_returnflag) pk from lineitem' : 'pk',
'select * from lineitem limit 50' : LINEITEM_PK,
'compute stats lineitem' : None,
'select * from lineitem order by l_orderkey' : LINEITEM_PK}
QUERY_TYPE = ["SELECT", "CTAS"]
# Time to sleep between issuing query and canceling
CANCEL_DELAY_IN_SECONDS = range(5)
# Number of times to execute/cancel each query under test
NUM_CANCELATION_ITERATIONS = 1
# Test cancellation on both running and hung queries
DEBUG_ACTIONS = [None, 'WAIT']
# Extra dimensions to test order by without limit
SORT_QUERY = 'select * from lineitem order by l_orderkey'
SORT_CANCEL_DELAY = range(6, 10)
SORT_BUFFER_POOL_LIMIT = ['0', '300m'] # Test spilling and non-spilling sorts.
class TestCancellation(ImpalaTestSuite):
@classmethod
def get_workload(self):
return 'tpch'
@classmethod
def add_test_dimensions(cls):
super(TestCancellation, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('query', *QUERIES.keys()))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('query_type', *QUERY_TYPE))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('cancel_delay', *CANCEL_DELAY_IN_SECONDS))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('action', *DEBUG_ACTIONS))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('buffer_pool_limit', 0))
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('query_type') != 'CTAS' or (\
v.get_value('table_format').file_format in ['text', 'parquet', 'kudu'] and\
v.get_value('table_format').compression_codec == 'none'))
cls.ImpalaTestMatrix.add_constraint(
lambda v: v.get_value('exec_option')['batch_size'] == 0)
# Ignore 'compute stats' queries for the CTAS query type.
cls.ImpalaTestMatrix.add_constraint(
lambda v: not (v.get_value('query_type') == 'CTAS' and
v.get_value('query').startswith('compute stats')))
# Ignore CTAS on Kudu if there is no PRIMARY KEY specified.
cls.ImpalaTestMatrix.add_constraint(
lambda v: not (v.get_value('query_type') == 'CTAS' and
v.get_value('table_format').file_format == 'kudu' and
QUERIES[v.get_value('query')] is None))
# tpch tables are not generated for hbase as the data loading takes a very long time.
# TODO: Add cancellation tests for hbase.
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format != 'hbase')
if cls.exploration_strategy() != 'core':
NUM_CANCELATION_ITERATIONS = 3
def cleanup_test_table(self, table_format):
self.execute_query("drop table if exists ctas_cancel", table_format=table_format)
def execute_cancel_test(self, vector):
query = vector.get_value('query')
query_type = vector.get_value('query_type')
if query_type == "CTAS":
self.cleanup_test_table(vector.get_value('table_format'))
file_format = vector.get_value('table_format').file_format
if file_format == 'kudu':
assert QUERIES.has_key(query) and QUERIES[query] is not None,\
"PRIMARY KEY for query %s not specified" % query
query = "create table ctas_cancel primary key (%s) "\
"partition by hash partitions 3 stored as kudu as %s" %\
(QUERIES[query], query)
else:
query = "create table ctas_cancel stored as %sfile as %s" %\
(file_format, query)
action = vector.get_value('action')
# node ID 0 is the scan node
debug_action = '0:GETNEXT:' + action if action != None else ''
vector.get_value('exec_option')['debug_action'] = debug_action
vector.get_value('exec_option')['buffer_pool_limit'] =\
vector.get_value('buffer_pool_limit')
# Execute the query multiple times, cancelling it each time.
for i in xrange(NUM_CANCELATION_ITERATIONS):
handle = self.execute_query_async(query, vector.get_value('exec_option'),
table_format=vector.get_value('table_format'))
def fetch_results():
threading.current_thread().fetch_results_error = None
try:
new_client = self.create_impala_client()
new_client.fetch(query, handle)
except ImpalaBeeswaxException as e:
threading.current_thread().fetch_results_error = e
thread = threading.Thread(target=fetch_results)
thread.start()
sleep(vector.get_value('cancel_delay'))
assert self.client.get_state(handle) != self.client.QUERY_STATES['EXCEPTION']
cancel_result = self.client.cancel(handle)
assert cancel_result.status_code == 0,\
'Unexpected status code from cancel request: %s' % cancel_result
thread.join()
if thread.fetch_results_error is None:
# If the query is cancelled while it's in the fetch rpc, it gets unregistered and
# therefore closed. Only call close on queries that did not fail fetch.
self.client.close_query(handle)
elif 'Cancelled' not in str(thread.fetch_results_error):
# If fetch failed for any reason other than cancellation, raise the error.
raise thread.fetch_results_error
if query_type == "CTAS":
self.cleanup_test_table(vector.get_value('table_format'))
# TODO: Add some additional verification to check to make sure the query was
# actually canceled
# Executing the same query without canceling should work fine. Only do this if the
# query has a limit or aggregation
if action is None and ('count' in query or 'limit' in query):
self.execute_query(query, vector.get_value('exec_option'))
def teardown_method(self, method):
# For some reason it takes a little while for the query to get completely torn down
# when the debug action is WAIT, causing TestValidateMetrics.test_metrics_are_zero to
# fail. Introducing a small delay allows everything to quiesce.
# TODO: Figure out a better way to address this
sleep(1)
class TestCancellationParallel(TestCancellation):
@classmethod
def add_test_dimensions(cls):
super(TestCancellationParallel, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('query_type') != 'CTAS')
def test_cancel_select(self, vector):
self.execute_cancel_test(vector)
class TestCancellationSerial(TestCancellation):
@classmethod
def add_test_dimensions(cls):
super(TestCancellationSerial, cls).add_test_dimensions()
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('query_type') == 'CTAS' or
v.get_value('query').startswith('compute stats'))
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('cancel_delay') != 0)
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('action') is None)
# Don't run across all cancel delay options unless running in exhaustive mode
if cls.exploration_strategy() != 'exhaustive':
cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('cancel_delay') in [3])
@pytest.mark.execute_serially
def test_cancel_insert(self, vector):
self.execute_cancel_test(vector)
metric_verifier = MetricVerifier(self.impalad_test_service)
try:
metric_verifier.verify_no_open_files(timeout=10)
except AssertionError:
pytest.xfail("IMPALA-551: File handle leak for INSERT")
class TestCancellationFullSort(TestCancellation):
@classmethod
def add_test_dimensions(cls):
super(TestCancellation, cls).add_test_dimensions()
# Override dimensions to only execute the order-by without limit query.
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('query', SORT_QUERY))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('query_type', 'SELECT'))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('cancel_delay', *SORT_CANCEL_DELAY))
cls.ImpalaTestMatrix.add_dimension(
ImpalaTestDimension('buffer_pool_limit', *SORT_BUFFER_POOL_LIMIT))
cls.ImpalaTestMatrix.add_dimension(ImpalaTestDimension('action', None))
cls.ImpalaTestMatrix.add_constraint(lambda v:\
v.get_value('table_format').file_format =='parquet' and\
v.get_value('table_format').compression_codec == 'none')
def test_cancel_sort(self, vector):
self.execute_cancel_test(vector)