| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| # Tests query cancellation using the ImpalaService.Cancel API |
| # |
| |
| import pytest |
| import threading |
| from time import sleep |
| from RuntimeProfile.ttypes import TRuntimeProfileFormat |
| from tests.beeswax.impala_beeswax import ImpalaBeeswaxException |
| from tests.common.test_vector import ImpalaTestDimension |
| from tests.common.impala_test_suite import ImpalaTestSuite |
| from tests.util.cancel_util import cancel_query_and_validate_state |
| from tests.verifiers.metric_verifier import MetricVerifier |
| |
| # PRIMARY KEY for lineitem |
| LINEITEM_PK = 'l_orderkey, l_partkey, l_suppkey, l_linenumber' |
| |
| # Queries to execute, mapped to a unique PRIMARY KEY for use in CTAS with Kudu. If None |
| # is specified for the PRIMARY KEY, it will not be used in a CTAS statement on Kudu. |
| # Use the TPC-H dataset because tables are large so queries take some time to execute. |
| QUERIES = {'select l_returnflag from lineitem' : None, |
| 'select count(l_returnflag) pk from lineitem' : 'pk', |
| 'select * from lineitem limit 50' : LINEITEM_PK, |
| 'compute stats lineitem' : None, |
| 'select * from lineitem order by l_orderkey' : LINEITEM_PK} |
| |
| QUERY_TYPE = ["SELECT", "CTAS"] |
| |
| # Time to sleep between issuing query and canceling. Favor small times since races |
| # are prone to occur more often when the time between RPCs is small. |
| CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4] |
| |
| # Number of times to execute/cancel each query under test |
| NUM_CANCELATION_ITERATIONS = 1 |
| |
| # Test cancellation on both running and hung queries. Node ID 0 is the scan node |
| WAIT_ACTIONS = [None, '0:GETNEXT:WAIT'] |
| |
| # Verify that failed CancelFInstances() RPCs don't lead to hung queries |
| FAIL_RPC_ACTIONS = [None, 'COORD_CANCEL_QUERY_FINSTANCES_RPC:FAIL'] |
| |
| # Test cancelling when there is a resource limit. |
| CPU_LIMIT_S = [0, 100000] |
| |
| # Verify close rpc running concurrently with fetch rpc. The two cases verify: |
| # False: close and fetch rpc run concurrently. |
| # True: cancel rpc is enough to ensure that the fetch rpc is unblocked. |
| JOIN_BEFORE_CLOSE = [False, True] |
| |
| # Extra dimensions to test order by without limit |
| SORT_QUERY = 'select * from lineitem order by l_orderkey' |
| SORT_CANCEL_DELAY = range(6, 10) |
| SORT_BUFFER_POOL_LIMIT = ['0', '300m'] # Test spilling and non-spilling sorts. |
| |
| class TestCancellation(ImpalaTestSuite): |
| @classmethod |
| def get_workload(self): |
| return 'tpch' |
| |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestCancellation, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('query', *QUERIES.keys())) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('query_type', *QUERY_TYPE)) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('cancel_delay', *CANCEL_DELAY_IN_SECONDS)) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('wait_action', *WAIT_ACTIONS)) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('fail_rpc_action', *FAIL_RPC_ACTIONS)) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('join_before_close', *JOIN_BEFORE_CLOSE)) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('buffer_pool_limit', 0)) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('cpu_limit_s', *CPU_LIMIT_S)) |
| |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('query_type') != 'CTAS' or (\ |
| v.get_value('table_format').file_format in ['text', 'parquet', 'kudu'] and\ |
| v.get_value('table_format').compression_codec == 'none')) |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('exec_option')['batch_size'] == 0) |
| # Ignore 'compute stats' queries for the CTAS query type. |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: not (v.get_value('query_type') == 'CTAS' and |
| v.get_value('query').startswith('compute stats'))) |
| |
| # Ignore CTAS on Kudu if there is no PRIMARY KEY specified. |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: not (v.get_value('query_type') == 'CTAS' and |
| v.get_value('table_format').file_format == 'kudu' and |
| QUERIES[v.get_value('query')] is None)) |
| |
| # tpch tables are not generated for hbase as the data loading takes a very long time. |
| # TODO: Add cancellation tests for hbase. |
| cls.ImpalaTestMatrix.add_constraint(lambda v:\ |
| v.get_value('table_format').file_format != 'hbase') |
| if cls.exploration_strategy() != 'core': |
| NUM_CANCELATION_ITERATIONS = 3 |
| |
| def cleanup_test_table(self, table_format): |
| self.execute_query("drop table if exists ctas_cancel", table_format=table_format) |
| |
| def execute_cancel_test(self, vector): |
| query = vector.get_value('query') |
| query_type = vector.get_value('query_type') |
| if query_type == "CTAS": |
| self.cleanup_test_table(vector.get_value('table_format')) |
| file_format = vector.get_value('table_format').file_format |
| if file_format == 'kudu': |
| assert QUERIES.has_key(query) and QUERIES[query] is not None,\ |
| "PRIMARY KEY for query %s not specified" % query |
| query = "create table ctas_cancel primary key (%s) "\ |
| "partition by hash partitions 3 stored as kudu as %s" %\ |
| (QUERIES[query], query) |
| else: |
| query = "create table ctas_cancel stored as %sfile as %s" %\ |
| (file_format, query) |
| |
| wait_action = vector.get_value('wait_action') |
| fail_rpc_action = vector.get_value('fail_rpc_action') |
| |
| debug_action = "|".join(filter(None, [wait_action, fail_rpc_action])) |
| vector.get_value('exec_option')['debug_action'] = debug_action |
| |
| vector.get_value('exec_option')['buffer_pool_limit'] =\ |
| vector.get_value('buffer_pool_limit') |
| vector.get_value('exec_option')['cpu_limit_s'] = vector.get_value('cpu_limit_s') |
| |
| # Execute the query multiple times, cancelling it each time. |
| for i in xrange(NUM_CANCELATION_ITERATIONS): |
| cancel_query_and_validate_state(self.client, query, |
| vector.get_value('exec_option'), vector.get_value('table_format'), |
| vector.get_value('cancel_delay'), vector.get_value('join_before_close')) |
| |
| if query_type == "CTAS": |
| self.cleanup_test_table(vector.get_value('table_format')) |
| |
| # Executing the same query without canceling should work fine. Only do this if the |
| # query has a limit or aggregation |
| if not debug_action and ('count' in query or 'limit' in query): |
| self.execute_query(query, vector.get_value('exec_option')) |
| |
| @pytest.mark.execute_serially |
| def test_misformatted_profile_text(self): |
| """Tests that canceled queries have no whitespace formatting errors in their profiles |
| (IMPALA-2063). Executes serially because it is timing-dependent and can be flaky.""" |
| query = "select count(*) from functional_parquet.alltypes where bool_col = sleep(100)" |
| client = self.hs2_client |
| # Start query |
| handle = client.execute_async(query) |
| # Wait for the query to start (with a long timeout to account for admission control |
| # queuing). |
| WAIT_SECONDS = 60 * 30 |
| assert any(client.get_state(handle) == 'RUNNING_STATE' or sleep(0.1) |
| for _ in range(10 * WAIT_SECONDS)), 'Query failed to start' |
| |
| client.cancel(handle) |
| # Wait up to 5 seconds for the query to get cancelled |
| # TODO(IMPALA-1262): This should be CANCELED_STATE |
| # TODO(IMPALA-8411): Remove and assert that the query is cancelled immediately |
| assert any(client.get_state(handle) == 'ERROR_STATE' or sleep(1) |
| for _ in range(5)), 'Query failed to cancel' |
| # Get profile and check for formatting errors |
| profile = client.get_runtime_profile(handle, TRuntimeProfileFormat.THRIFT) |
| for (k, v) in profile.nodes[1].info_strings.iteritems(): |
| # Ensure that whitespace gets removed from values. |
| assert v == v.rstrip(), \ |
| "Profile value contains surrounding whitespace: %s %s" % (k, v) |
| # Plan text may be strangely formatted. |
| assert k == 'Plan' or '\n\n' not in v, \ |
| "Profile contains repeating newlines: %s %s" % (k, v) |
| |
| def teardown_method(self, method): |
| # For some reason it takes a little while for the query to get completely torn down |
| # when the debug action is WAIT, causing TestValidateMetrics.test_metrics_are_zero to |
| # fail. Introducing a small delay allows everything to quiesce. |
| # TODO: Figure out a better way to address this |
| sleep(1) |
| |
| |
| class TestCancellationParallel(TestCancellation): |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestCancellationParallel, cls).add_test_dimensions() |
| cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('query_type') != 'CTAS') |
| |
| def test_cancel_select(self, vector): |
| self.execute_cancel_test(vector) |
| |
| class TestCancellationSerial(TestCancellation): |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestCancellationSerial, cls).add_test_dimensions() |
| # Only run the insert tests in this suite - they need to be serial to allow us to |
| # check for file handle leaks. |
| cls.ImpalaTestMatrix.add_constraint(lambda v: v.get_value('query_type') == 'CTAS') |
| |
| # This test suite is slow because it executes serially. Restrict some of the params |
| # that are not interesting for inserts. |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('cpu_limit_s') == CPU_LIMIT_S[0]) |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('join_before_close') == JOIN_BEFORE_CLOSE[0]) |
| if cls.exploration_strategy() != 'exhaustive': |
| # Only run a single 'cancel_delay' option in core. |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('cancel_delay') == CANCEL_DELAY_IN_SECONDS[3]) |
| else: |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('cancel_delay') != CANCEL_DELAY_IN_SECONDS[0]) |
| |
| @pytest.mark.execute_serially |
| def test_cancel_insert(self, vector): |
| self.execute_cancel_test(vector) |
| metric_verifier = MetricVerifier(self.impalad_test_service) |
| metric_verifier.verify_no_open_files(timeout=10) |
| |
| class TestCancellationFullSort(TestCancellation): |
| @classmethod |
| def add_test_dimensions(cls): |
| super(TestCancellationFullSort, cls).add_test_dimensions() |
| # Override dimensions to only execute the order-by without limit query. |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('query', SORT_QUERY)) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('query_type', 'SELECT')) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('cancel_delay', *SORT_CANCEL_DELAY)) |
| cls.ImpalaTestMatrix.add_dimension( |
| ImpalaTestDimension('buffer_pool_limit', *SORT_BUFFER_POOL_LIMIT)) |
| cls.ImpalaTestMatrix.add_constraint( |
| lambda v: v.get_value('fail_rpc_action') == FAIL_RPC_ACTIONS[0]) |
| cls.ImpalaTestMatrix.add_constraint(lambda v:\ |
| v.get_value('table_format').file_format =='parquet' and\ |
| v.get_value('table_format').compression_codec == 'none') |
| |
| def test_cancel_sort(self, vector): |
| self.execute_cancel_test(vector) |