blob: 5c08d6ac3adf9bcbb818485fced55b6aa4415b8d [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import threading
from time import sleep
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.impala_test_suite import ImpalaTestSuite
def cancel_query_and_validate_state(client, query, exec_option, table_format,
cancel_delay, join_before_close=False):
"""Runs the given query asynchronously and then cancels it after the specified delay.
The query is run with the given 'exec_options' against the specified 'table_format'. A
separate async thread is launched to fetch the results of the query. The method
validates that the query was successfully cancelled and that the error messages for the
calls to ImpalaConnection#fetch and #close are consistent. If 'join_before_close' is
True the method will join against the fetch results thread before closing the query.
"""
if exec_option: client.set_configuration(exec_option)
if table_format: ImpalaTestSuite.change_database(client, table_format)
handle = client.execute_async(query)
thread = threading.Thread(target=__fetch_results, args=(query, handle))
thread.start()
sleep(cancel_delay)
assert client.get_state(handle) != client.QUERY_STATES['EXCEPTION']
cancel_result = client.cancel(handle)
assert cancel_result.status_code == 0,\
'Unexpected status code from cancel request: %s' % cancel_result
if join_before_close:
thread.join()
close_error = None
try:
client.close_query(handle)
except ImpalaBeeswaxException as e:
close_error = e
# Before accessing fetch_results_error we need to join the fetch thread
thread.join()
if thread.fetch_results_error is None:
# If the fetch rpc didn't result in CANCELLED (and auto-close the query) then
# the close rpc should have succeeded.
assert close_error is None
elif close_error is None:
# If the close rpc succeeded, then the fetch rpc should have either succeeded,
# failed with 'Cancelled' or failed with 'Invalid query handle' (if the close
# rpc occured before the fetch rpc).
if thread.fetch_results_error is not None:
assert 'Cancelled' in str(thread.fetch_results_error) or \
('Invalid query handle' in str(thread.fetch_results_error)
and not join_before_close)
else:
# If the close rpc encountered an exception, then it must be due to fetch
# noticing the cancellation and doing the auto-close.
assert 'Invalid or unknown query handle' in str(close_error)
assert 'Cancelled' in str(thread.fetch_results_error)
# TODO: Add some additional verification to check to make sure the query was
# actually canceled
def __fetch_results(query, handle):
threading.current_thread().fetch_results_error = None
threading.current_thread().query_profile = None
try:
new_client = ImpalaTestSuite.create_impala_client()
new_client.fetch(query, handle)
except ImpalaBeeswaxException as e:
threading.current_thread().fetch_results_error = e