blob: 34e83ae92d4f24b7e2aa364b31f636c9294d9fb0 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# TODO: Query retries can be triggered in several different ways, and some tests
# pick one approach for no particular reason, try to consolidate the different
# ways that retries are triggered.
# TODO: Re-factor tests into multiple classes.
# TODO: Add a test that cancels queries while a retry is running
import pytest
import re
import shutil
import tempfile
import time
from random import randint
from RuntimeProfile.ttypes import TRuntimeProfileFormat
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
from tests.common.impala_test_suite import ImpalaTestSuite, LOG
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.errors import Timeout
from tests.common.skip import (SkipIfEC, SkipIfBuildType, SkipIfFS,
# The BE krpc port of the impalad to simulate rpc or disk errors in tests.
def _get_rpc_fail_action(port):
return "IMPALA_SERVICE_POOL:{port}:ExecQueryFInstances:FAIL" \
def _get_disk_fail_action(port):
return "IMPALA_TMP_FILE_WRITE:{port}:FAIL".format(port=port)
# All tests in this class have SkipIfEC because all tests run a query and expect
# the query to be retried when killing a random impalad. On EC this does not always work
# because many queries that might run on three impalads for HDFS / S3 builds, might only
# run on two instances on EC builds. The difference is that EC creates smaller tables
# compared to data stored on HDFS / S3. If the query is only run on two instances, then
# randomly killing one impalad won't necessarily trigger a retry of the query.
class TestQueryRetries(CustomClusterTestSuite):
# A query that shuffles a lot of data. Useful when testing query retries since it
# ensures that a query fails during a TransmitData RPC. The RPC failure will cause the
# target impalad to be blacklisted and the query to be retried. The query also has to
# run long enough so that it fails when an impalad is killed.
_shuffle_heavy_query = "select * from tpch.lineitem t1, tpch.lineitem t2 where " \
"t1.l_orderkey = t2.l_orderkey order by t1.l_orderkey, t2.l_orderkey limit 1"
_shuffle_heavy_query_results = "1\t155190\t7706\t1\t17.00\t21168.23\t0.04\t0.02\tN" \
"\tO\t1996-03-13\t1996-02-12\t1996-03-22\tDELIVER IN PERSON\tTRUCK" \
"\tegular courts above the\t1\t15635\t638\t6\t32.00\t49620.16\t0.07\t0.02\tN\tO" \
"\t1996-01-30\t1996-02-07\t1996-02-03\tDELIVER IN PERSON\tMAIL\tarefully slyly ex"
# The following query has two union operands. The first operand executes quickly
# and the second one executes slowly. So we can kill one impalad when some results
# are ready and the query is still running and has more results.
_union_query = """select count(*) from functional.alltypestiny
union all
select count(*) from functional.alltypes where bool_col = sleep(50)"""
# A simple count query with predicate. The predicate is needed so that the planner does
# not create the optimized count(star) query plan.
_count_query = "select count(*) from tpch_parquet.lineitem where l_orderkey < 50"
_count_query_result = "55"
def get_workload(cls):
return 'functional-query'
def test_retries_from_cancellation_pool(self, cursor):
"""Tests that queries are retried instead of cancelled if one of the nodes leaves the
cluster. The retries are triggered by the cancellation pool in the ImpalaServer. The
cancellation pool listens for updates from the statestore and kills all queries that
are running on any nodes that are no longer part of the cluster membership."""
# The following query executes slowly, and does minimal TransmitData RPCs, so it is
# likely that the statestore detects that the impalad has been killed before a
# TransmitData RPC has occurred.
query = "select count(*) from functional.alltypes where bool_col = sleep(50)"
# Launch the query, wait for it to start running, and then kill an impalad.
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], 60)
# Kill a random impalad (but not the one executing the actual query).
# Validate the query results.
results = self.client.fetch(query, handle)
assert results.success
assert len( == 1
assert int([0]) == 3650
# Validate the live exec summary.
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
# Validate the state of the runtime profiles.
retried_runtime_profile = self.client.get_runtime_profile(handle)
retried_runtime_profile, handle.get_handle().id, retried_query_id)
# Validate the state of the client log.
self.__validate_client_log(handle, retried_query_id)
# Validate the state of the web ui. The query must be closed before validating the
# state since it asserts that no queries are in flight.
def test_kill_impalad_expect_retry(self):
"""Launch a query, wait for it to start running, kill a random impalad and then
validate that the query has successfully been retried. Increase the statestore
heartbeat frequency so that the query actually fails during execution. Otherwise, it
is possible the statestore detects that the killed impalad has crashed before the
query does. This is necessary since this test specifically attempts to test the code
that retries queries when a query fails mid-execution."""
# Launch a query, it should be retried.
handle = self.execute_query_async(self._shuffle_heavy_query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], 60)
# Kill a random impalad.
killed_impalad = self.__kill_random_impalad()
# Assert that the query succeeded and returned the correct results.
results = self.client.fetch(self._shuffle_heavy_query, handle)
assert results.success
assert len( == 1
assert self._shuffle_heavy_query_results in[0]
# Validate the live exec summary.
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
# The runtime profile of the retried query.
retried_runtime_profile = self.client.get_runtime_profile(handle)
# Assert that the killed impalad shows up in the list of blacklisted executors from
# the runtime profile.
self.__assert_executors_blacklisted(killed_impalad, retried_runtime_profile)
# Validate the state of the runtime profiles.
retried_runtime_profile, handle.get_handle().id, retried_query_id)
# Validate the state of the client log.
self.__validate_client_log(handle, retried_query_id)
# Validate the state of the web ui. The query must be closed before validating the
# state since it asserts that no queries are in flight.
# Assert that the web ui shows all queries are complete.
completed_queries = self.cluster.get_first_impalad().service.get_completed_queries()
# Assert that the most recently completed query is the retried query and it is marked
assert completed_queries[0]['state'] == 'FINISHED'
assert completed_queries[0]['query_id'] == self.__get_query_id_from_profile(
# Assert that the second most recently completed query is the original query and it is
# marked as 'RETRIED'.
assert completed_queries[1]['state'] == 'RETRIED'
assert completed_queries[1]['query_id'] == handle.get_handle().id
def test_kill_impalad_expect_retries(self):
"""Similar to 'test_kill_impalad_expect_retry' except it runs multiple queries in
parallel and then kills an impalad. Several of the code comments in
'test_kill_impalad_expect_retry' apply here as well."""
# Launch a set of concurrent queries.
num_concurrent_queries = 3
handles = []
for _ in xrange(num_concurrent_queries):
handle = self.execute_query_async(self._shuffle_heavy_query,
query_options={'retry_failed_queries': 'true'})
# Wait for each query to start running.
running_state = self.client.QUERY_STATES['RUNNING']
map(lambda handle: self.wait_for_state(handle, running_state, 60), handles)
# Kill a random impalad.
killed_impalad = self.__kill_random_impalad()
# Fetch and validate the results from each query.
for handle in handles:
results = self.client.fetch(self._shuffle_heavy_query, handle)
assert results.success
assert len( == 1
assert self._shuffle_heavy_query_results in[0]
# Validate the runtime profiles of each query.
for handle in handles:
retried_runtime_profile = self.client.get_runtime_profile(handle)
self.__assert_executors_blacklisted(killed_impalad, retried_runtime_profile)
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
retried_runtime_profile, handle.get_handle().id, retried_query_id)
self.__validate_client_log(handle, retried_query_id)
# Validate the state of the Web UI.
def test_retry_exec_rpc_failure(self):
"""Test ExecFInstance RPC failures. Set a really high statestort heartbeat frequency
so that killed impalads are not removed from the cluster membership. This will cause
Impala to still attempt an Exec RPC to the failed node, which should trigger a
impalad_service = self.cluster.get_first_impalad().service
# Kill an impalad, and run a query. The query should be retried.
killed_impalad = self.__kill_random_impalad()
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
# Validate that the query was retried.
self.__validate_runtime_profiles_from_service(impalad_service, handle)
# Assert that the query succeeded and returned the correct results.
results = self.client.fetch(query, handle)
assert results.success
assert len( == 1
assert self._count_query_result in[0]
# The runtime profile of the retried query.
retried_runtime_profile = self.client.get_runtime_profile(handle)
# Assert that the killed impalad shows up in the list of blacklisted executors from
# the runtime profile.
self.__assert_executors_blacklisted(killed_impalad, retried_runtime_profile)
# Validate the live exec summary.
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
# Validate the state of the runtime profiles.
retried_runtime_profile, handle.get_handle().id, retried_query_id)
# Validate the state of the client log.
self.__validate_client_log(handle, retried_query_id)
# Validate the state of the web ui. The query must be closed before validating the
# state since it asserts that no queries are in flight.
impalad_args="--debug_actions=" + _get_rpc_fail_action(FAILED_KRPC_PORT),
statestored_args="--statestore_heartbeat_frequency_ms=1000 \
def test_retry_exec_rpc_failure_before_admin_delay(self):
"""Test retried query triggered by RPC failures by simulating RPC errors at the port
of the 2nd node in the cluster. Simulate admission delay for query with debug_action
so that the 2nd node is removed from the blacklist when making schedule for retried
query. Verify that retried query is executed successfully, while the 2nd node is not
in the executor blacklist and it is not assigned to any fragment instance."""
impalad_service = self.cluster.get_first_impalad().service
rpc_not_accessible_impalad = self.cluster.impalads[1]
assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
# The 2nd node cannot be accessible through KRPC so that it's added to blacklist
# and the query should be retried. Add delay before admission so that the 2nd node
# is removed from the blacklist before scheduler makes schedule for the retried
# query.
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 80)
# Validate that the query was retried.
self.__validate_runtime_profiles_from_service(impalad_service, handle)
# Assert that the query succeeded and returned the correct results.
results = self.client.fetch(query, handle)
assert results.success
assert len( == 1
assert self._count_query_result in[0]
# The runtime profile of the retried query.
retried_runtime_profile = self.client.get_runtime_profile(handle)
# Assert that the 2nd node does not show up in the list of blacklisted executors
# from the runtime profile.
# Assert that the 2nd node is not assigned any fragment instance for retried query
# execution.
# Validate the live exec summary.
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
# Validate the state of the runtime profiles.
retried_runtime_profile, handle.get_handle().id, retried_query_id)
# Validate the state of the client log.
self.__validate_client_log(handle, retried_query_id)
# Validate the state of the web ui. The query must be closed before validating the
# state since it asserts that no queries are in flight.
impalad_args="--debug_actions=" + _get_rpc_fail_action(FAILED_KRPC_PORT),
statestored_args="--statestore_heartbeat_frequency_ms=1000 \
cluster_size=2, num_exclusive_coordinators=1)
def test_retry_query_failure_all_executors_blacklisted(self):
"""Test retried query triggered by RPC failures by simulating RPC errors at the port
of the 2nd node, which is the only executor in the cluster. Simulate admission delay
for query with debug_action so that the 2nd node is removed from the blacklist and
added back to executor group when making schedule for retried query. Verify that
retried query fails due to no executor available even the 2nd node is not in the
executor blacklist."""
rpc_not_accessible_impalad = self.cluster.impalads[1]
assert rpc_not_accessible_impalad.service.krpc_port == FAILED_KRPC_PORT
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true',
'debug_action': 'AC_BEFORE_ADMISSION:SLEEP@18000'})
# Wait until the query fails.
self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 140)
# Validate the live exec summary.
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
# The runtime profile and client log of the retried query, need to be retrieved
# before fetching results, since the failed fetch attempt will close the
# query handle.
retried_runtime_profile = self.client.get_runtime_profile(handle)
self.__validate_client_log(handle, retried_query_id)
# Assert that the query failed since all executors are blacklisted and no executor
# available for scheduling the query to be retried. To keep consistent with the other
# blacklisting logic, Impalad return error message "Admission for query exceeded
# timeout 60000ms in pool default-pool. Queued reason: Waiting for executors to
# start...".
self.client.fetch(self._shuffle_heavy_query, handle)
assert False
except ImpalaBeeswaxException, e:
assert "Admission for query exceeded timeout 60000ms in pool default-pool." \
in str(e)
assert "Queued reason: Waiting for executors to start. Only DDL queries and " \
"queries scheduled only on the coordinator (either NUM_NODES set to 1 " \
"or when small query optimization is triggered) can currently run" in str(e)
assert "Additional Details: Not Applicable" in str(e)
# Assert that the RPC un-reachable impalad not shows up in the list of blacklisted
# executors from the runtime profile.
# Assert that the query id of the original query is in the runtime profile of the
# retried query.
# Validate the state of the web ui. The query must be closed before validating the
# state since it asserts that no queries are in flight.
def test_multiple_retries(self):
"""Test that a query can only be retried once, and that if the retry attempt fails,
it fails correctly and with the right error message. Multiple retry attempts are
triggered by killing multiple impalads. The final attempt at retrying the query
should indicate that the error was retryable, and that the max retry limit was
# Launch a query, it should be retried.
handle = self.execute_query_async(self._shuffle_heavy_query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['RUNNING'], 60)
# Kill one impalad so that a retry is triggered.
killed_impalad = self.cluster.impalads[1]
# Wait until the retry is running.
self.__wait_until_retry_state(handle, 'RETRIED')
# Kill another impalad so that another retry is attempted.
# Wait until the query fails.
self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
# Validate the live exec summary.
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
# The runtime profile and client log of the retried query, need to be retrieved
# before fetching results, since the failed fetch attempt will close the
# query handle.
retried_runtime_profile = self.client.get_runtime_profile(handle)
self.__validate_client_log(handle, retried_query_id)
# Assert that the query failed, since a query can only be retried once.
self.client.fetch(self._shuffle_heavy_query, handle)
assert False
except ImpalaBeeswaxException, e:
assert "Max retry limit was hit. Query was retried 1 time(s)." in str(e)
# Assert that the killed impalad shows up in the list of blacklisted executors from
# the runtime profile.
self.__assert_executors_blacklisted(killed_impalad, retried_runtime_profile)
# Assert that the query id of the original query is in the runtime profile of the
# retried query.
# Validate the state of the web ui. The query must be closed before validating the
# state since it asserts that no queries are in flight.
def test_retry_fetched_rows(self):
"""Test that query retries are not triggered if some rows have already been
fetched. Run a query, fetch some rows from it, kill one of the impalads that is
running the query, and the validate that another fetch request fails."""
query = "select * from functional.alltypes where bool_col = sleep(500)"
# IMPALA-9856: The retry behavior that is being exercised in this test only applies
# for query without result spooling enabled. Thus, we explicitly set
# spool_query_results as false.
query_options = {'retry_failed_queries': 'true', 'batch_size': '1',
'spool_query_results': 'false'}
handle = self.execute_query_async(query, query_options)
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
self.client.fetch(query, handle, max_rows=1)
# Assert that attempt to fetch from the query handle fails.
self.client.fetch(query, handle)
assert False
except Exception as e:
assert "Failed due to unreachable impalad" in str(e)
assert "Skipping retry of query_id=%s because the client has already " \
"fetched some rows" % handle.get_handle().id in str(e)
def test_spooling_all_results_for_retries(self):
"""Test retryable queries with spool_all_results_for_retries=true will spool all
results when results spooling is enabled."""
handle = self.execute_query_async(self._union_query, query_options={
'retry_failed_queries': 'true', 'spool_query_results': 'true',
'spool_all_results_for_retries': 'true'})
# Fetch one row first.
results = self.client.fetch(self._union_query, handle, max_rows=1)
assert len( == 1
assert int([0]) == 8
# All results are spooled since we are able to fetch some results.
# Killing an impalad should not trigger query retry.
# We are still able to fetch the remaining results.
results = self.client.fetch(self._union_query, handle)
assert len( == 1
assert int([0]) == 3650
# Verify no retry happens
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is None
runtime_profile = self.client.get_runtime_profile(handle)
assert self.__get_query_id_from_profile(runtime_profile) == handle.get_handle().id
def test_query_retry_in_spooling(self):
"""Test retryable queries with results spooling enabled and
spool_all_results_for_retries=true can be safely retried for failures that happen when
it's still spooling the results"""
handle = self.execute_query_async(self._union_query, query_options={
'retry_failed_queries': 'true', 'spool_query_results': 'true',
'spool_all_results_for_retries': 'true'})
# Wait until the first union operand finishes, so some results are spooled.
self.wait_for_progress(handle, 0.1, 60)
# Still able to fetch the correct result since the query is retried.
results = self.client.fetch(self._union_query, handle)
assert len( == 2
assert int([0]) == 8
assert int([1]) == 3650
# Verify the query has been retried
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
def test_retried_query_not_spooling_all_results(self):
"""Test retried query can return results immediately even when results spooling and
spool_all_results_for_retries are enabled in the original query."""
handle = self.execute_query_async(self._union_query, query_options={
'retry_failed_queries': 'true', 'spool_query_results': 'true',
'spool_all_results_for_retries': 'true'})
# Wait until the first union operand finishes and then kill one impalad.
self.wait_for_progress(handle, 0.1, 60)
# Kill one impalad so the query will be retried.
# Verify that we are able to fetch results of the first union operand while the query
# is still executing the second union operand.
results = self.client.fetch(self._union_query, handle, max_rows=1)
assert len( == 1
assert int([0]) == 8
# Assert that the query is still executing the second union operand.
summary = self.client.get_exec_summary(handle)
assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges
def test_query_retry_reaches_spool_limit(self):
"""Test retryable queries with results spooling enabled and
spool_all_results_for_retries=true that reach spooling mem limit will return rows and
skip retry"""
query = "select * from functional.alltypes where bool_col = sleep(500)"
# Set lower values for spill-to-disk configs to force the above query to spill
# spooled results and hit result queue limit.
handle = self.execute_query_async(query, query_options={
'batch_size': 1,
'spool_query_results': True,
'retry_failed_queries': True,
'spool_all_results_for_retries': True,
'min_spillable_buffer_size': 8 * 1024,
'default_spillable_buffer_size': 8 * 1024,
'max_result_spooling_mem': 8 * 1024,
'max_row_size': 8 * 1024,
'max_spilled_result_spooling_mem': 8 * 1024})
# Wait until we can fetch some results
results = self.client.fetch(query, handle, max_rows=1)
assert len( == 1
# PLAN_ROOT_SINK's reservation limit should be set at
plan_root_sink_reservation_limit = "PLAN_ROOT_SINK[\s\S]*?ReservationLimit: 16.00 KB"
profile = self.client.get_runtime_profile(handle)
assert, profile)
# Assert that the query is still executing
summary = self.client.get_exec_summary(handle)
assert summary.progress.num_completed_scan_ranges < summary.progress.total_scan_ranges
self.assert_impalad_log_contains('INFO', 'Cannot spool all results in the allocated'
' result spooling space. Query retry will be skipped if any results have been '
'returned.', expected_count=1)
# Kill one impalad and assert that the query is not retried.
self.client.fetch(query, handle)
assert False, "fetch should fail"
except ImpalaBeeswaxException as e:
assert "Failed due to unreachable impalad" in str(e)
assert "Skipping retry of query_id=%s because the client has already " \
"fetched some rows" % handle.get_handle().id in str(e)
def test_original_query_cancel(self):
"""Test canceling a retryable query with spool_all_results_for_retries=true. Make sure
Coordinator::Wait() won't block in cancellation."""
for state in ['RUNNING', 'FINISHED']:
handle = self.execute_query_async(self._union_query, query_options={
'retry_failed_queries': 'true', 'spool_query_results': 'true',
'spool_all_results_for_retries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES[state], 60)
# Cancel the query.
# Assert that attempt to fetch from the query handle fails with a cancellation
# error
self.client.fetch(self._union_query, handle)
assert False
except Exception as e:
assert "Cancelled" in str(e)
def test_retry_finished_query(self):
"""Test that queries in FINISHED state can still be retried before the client fetch
any rows. Sets batch_size to 1 so results will be available as soon as possible.
The query state becomes FINISHED when results are available."""
query = "select * from functional.alltypes where bool_col = sleep(50)"
# IMPALA-9856: The retry behavior that is being exercised in this test only applies
# for query without result spooling enabled. Thus, we explicitly set
# spool_query_results as false.
query_options = {'retry_failed_queries': 'true', 'batch_size': '1',
'spool_query_results': 'false'}
handle = self.execute_query_async(query, query_options)
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
self.client.fetch(query, handle)
# Verifies the query is retried.
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
def test_retry_query_cancel(self):
"""Trigger a query retry, and then cancel the retried query. Validate that the
cancelled query fails with the correct error message. Set a really high statestore
heartbeat frequency so that killed impalads are not removed from the cluster
impalad_service = self.cluster.get_first_impalad().service
# Kill an impalad, and run a query. The query should be retried.
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 60)
# Validate the live exec summary.
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
# Validate that the query was retried.
profile_retried_query_id = \
self.__validate_runtime_profiles_from_service(impalad_service, handle)
assert profile_retried_query_id == retried_query_id
self.__validate_client_log(handle, retried_query_id)
# Cancel the query.
# Assert than attempt to fetch from the query handle fails with a cancellation
# error
self.client.fetch(query, handle)
assert False
except Exception, e:
assert "Cancelled" in str(e)
def test_retrying_query_cancel(self):
"""Trigger a query retry, and then cancel and close the retried query in RETRYING
state. Validate that it doesn't crash the impalad. Set a really high statestore
heartbeat frequency so that killed impalads are not removed from the cluster
# Kill an impalad, and run a query. The query should be retried.
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
self.__wait_until_retry_state(handle, 'RETRYING')
# Cancel the query.
# Check the original query retry status.
profile = self.__get_original_query_profile(handle.get_handle().id)
retry_status ="Retry Status: (.*)", profile)
assert == 'RETRYING'
assert self.cluster.impalads[0].get_pid() is not None, "Coordinator crashed"
def test_retry_query_close_before_getting_query_driver(self):
"""Trigger a query retry, and then close the retried query before getting
the query driver. Validate that it doesn't crash the impalad.
Set a really high statestore heartbeat frequency so that killed impalads are not
removed from the cluster membership."""
# Kill an impalad, and run a query. The query should be retried.
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true'})
# close the query
impala_service = self.cluster.get_first_impalad().service
self.assert_eventually(60, 0.1,
lambda: impala_service.get_num_in_flight_queries() == 0,
lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
def test_retry_query_result_cacheing_failed(self):
"""Test setting up results cacheing failed."""
# Kill an impalad, and run a query. The query should be retried.
query = self._count_query
self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
# The number of in-flight queries is 0 at the beginning, then 1 when the original
# query is submitted. It's 2 when the retried query is registered. Although the retry
# will immediately fail due to the debug action and the 2 queries are unregistered,
# the number can't come back to 0 immediately. The reason is the registered queries
# are cleaned up by a backend thread. Sleep a while to make sure these finish.
# TODO(IMPALA-10705): Verify the retry failure.
# No queries running at the end.
impala_service = self.cluster.get_first_impalad().service
self.assert_eventually(60, 0.1,
lambda: impala_service.get_num_in_flight_queries() == 0,
lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
def test_retry_query_set_query_in_flight_failed(self):
"""Test setting query in flight failed."""
# Kill an impalad, and run a query. The query should be retried.
query = self._count_query
self.execute_query_async(query, query_options={'retry_failed_queries': 'true'})
# The number of in-flight queries is 0 at the beginning, then 1 when the original
# query is submitted. It's 2 when the retried query is registered. Although the retry
# will immediately fail due to the debug action and the 2 queries are unregistered,
# the number can't come back to 0 immediately. The reason is the registered queries
# are cleaned up by a backend thread. Sleep a while to make sure these finish.
# TODO(IMPALA-10705): Verify the retry failure.
# No queries running at the end.
impala_service = self.cluster.get_first_impalad().service
self.assert_eventually(60, 0.1,
lambda: impala_service.get_num_in_flight_queries() == 0,
lambda: "in-flight queries: %d" % impala_service.get_num_in_flight_queries())
def test_retry_query_timeout(self):
"""Trigger a query retry, and then leave the query handle open until the
'query_timeout_s' causes the handle to be closed. Assert that the runtime profile of
the original and retried queries are correct, and that the 'num-queries-expired'
metric is properly incremented. Set a really high statestore heartbeat frequency so
that killed impalads are not removed from the cluster membership."""
impalad_service = self.cluster.get_first_impalad().service
# Kill an impalad, and run a query. The query should be retried.
query = self._count_query
handle = self.execute_query_async(query,
query_options={'retry_failed_queries': 'true', 'query_timeout_s': '1'})
self.wait_for_state(handle, self.client.QUERY_STATES['EXCEPTION'], 60)
# Validate the live exec summary.
retried_query_id = self.__get_retried_query_id_from_summary(handle)
assert retried_query_id is not None
# Wait for the query timeout to expire the query handle.
# Validate that the query was retried.
profile_retried_query_id = \
self.__validate_runtime_profiles_from_service(impalad_service, handle)
assert profile_retried_query_id == retried_query_id
self.__validate_client_log(handle, retried_query_id)
# Assert than attempt to fetch from the query handle fails with a query expired
# error.
self.client.fetch(query, handle)
assert False
except Exception, e:
assert "expired due to client inactivity" in str(e)
# Assert that the impalad metrics show one expired query.
assert impalad_service.get_metric_value('impala-server.num-queries-expired') == 1
def test_retry_query_session_timeout(self):
"""Similar to 'test_retry_query_timeout' except with an idle session timeout."""
impalad_service = self.cluster.get_first_impalad().service
# Kill an impalad, and run a query. The query should be retried.
query = self._count_query
client = self.cluster.get_first_impalad().service.create_beeswax_client()
client.set_configuration({'retry_failed_queries': 'true'})
handle = client.execute_async(query)
self.wait_for_state(handle, client.QUERY_STATES['FINISHED'], 60, client=client)
# Wait for the idle session timeout to expire the session.
# Validate that the query was retried. Skip validating client log since we can't
# get it using the expired session.
self.__validate_runtime_profiles_from_service(impalad_service, handle)
# Assert than attempt to fetch from the query handle fails with a session expired
# error.
client.fetch(query, handle)
except Exception, e:
assert "Client session expired" in str(e)
# Assert that the impalad metrics show one expired session.
assert impalad_service.get_metric_value('impala-server.num-sessions-expired') == 1
def test_retry_query_hs2(self):
"""Test query retries with the HS2 protocol. Enable the results set cache as well and
test that query retries work with the results cache."""
query = self._count_query
self.hs2_client.set_configuration({'retry_failed_queries': 'true'})
self.hs2_client.set_configuration_option('impala.resultset.cache.size', '1024')
handle = self.hs2_client.execute_async(query)
self.wait_for_state(handle, 'FINISHED_STATE', 60, client=self.hs2_client)
results = self.hs2_client.fetch(query, handle)
assert results.success
assert len( == 1
assert[0] == self._count_query_result
# Validate the live exec summary.
retried_query_id = \
self.__get_retried_query_id_from_summary(handle, use_hs2_client=True)
assert retried_query_id is not None
# Validate the state of the runtime profiles.
retried_runtime_profile = self.hs2_client.get_runtime_profile(handle,
retried_runtime_profile, self.hs2_client.get_query_id(handle), retried_query_id)
self.__validate_client_log(handle, retried_query_id, use_hs2_client=True)
'', 1, timeout=60)
def __validate_runtime_profiles_from_service(self, impalad_service, handle):
"""Wrapper around '__validate_runtime_profiles' that first fetches the retried profile
from the web ui."""
original_profile = impalad_service.read_query_profile_page(handle.get_handle().id)
retried_query_id = self.__get_retried_query_id_from_profile(original_profile)
retried_profile = impalad_service.read_query_profile_page(retried_query_id)
retried_profile, handle.get_handle().id, retried_query_id)
return retried_query_id
def __get_retried_query_id_from_profile(self, profile):
"""Returns the entry for 'Retried Query Id' from the given profile, or 'None' if no
such entry exists."""
retried_query_id_search ="Retried Query Id: (.*)", profile)
if not retried_query_id_search: return None
def __wait_until_retry_state(self, handle, retry_state, timeout=300):
"""Wait until the given query handle has been retried. This is achieved by polling the
runtime profile of the query and checking the 'Retry Status' field."""
def __get_retry_status():
profile = self.__get_original_query_profile(handle.get_handle().id)
retry_status ="Retry Status: (.*)", profile)
return if retry_status else None
start_time = time.time()
retry_status = __get_retry_status()
while retry_status != retry_state and time.time() - start_time < timeout:"Wait 100ms for retry state {0}. Current retry state: {1}"
.format(retry_state, retry_status))
retry_status = __get_retry_status()
if retry_status != retry_state:
raise Timeout("query {0} was not retried within timeout".format
def __kill_random_impalad(self):
"""Kills a random impalad, except for the first node in the cluster, which should be
the Coordinator. Returns the killed impalad."""
killed_impalad = \
self.cluster.impalads[randint(1, ImpalaTestSuite.get_impalad_cluster_size() - 1)]
return killed_impalad
def __get_query_id_from_profile(self, profile):
"""Extracts and returns the query id of the given profile."""
query_id_search ="Query \(id=(.*)\)", profile)
assert query_id_search, "Invalid query profile, has no query id:\n{0}".format(
def __get_original_query_profile(self, original_query_id):
"""Returns the query profile of the original query attempt."""
# TODO (IMPALA-9229): there is no way to get the runtime profiles of the unsuccessful
# query attempts from the ImpalaServer, so fetch them from the debug UI instead.
return self.cluster.get_first_impalad().service.read_query_profile_page(
def __validate_original_id_in_profile(self, retried_runtime_profile, original_query_id):
"""Validate that the orginal query id is in the 'Original Query Id' entry of the
given retried runtime profile."""
original_id_pattern = "Original Query Id: (.*)"
original_id_search =, retried_runtime_profile)
assert original_id_search, \
"Could not find original id pattern '{0}' in profile:\n{1}".format(
original_id_pattern, retried_runtime_profile)
assert == original_query_id
def __validate_runtime_profiles(self, retried_runtime_profile, original_query_id,
""""Validate the runtime profiles of both the original and retried queries. The
'retried_runtime_profile' refers to the runtime profile of the retried query (the
most recent attempt of the query, which should have succeeded). The
'original_runtime_profile' refers to the runtime profile of the original query (the
original attempt of the query submitted by the user, which failed and had to be
# Check the retried query id in the retried runtime profile.
assert retried_query_id == self.__get_query_id_from_profile(retried_runtime_profile)
# Assert that the query id of the original query is in the runtime profile of the
# retried query.
# Get the original runtime profile from the retried runtime profile.
original_runtime_profile = self.__get_original_query_profile(original_query_id)
# Validate the contents of the original runtime profile.
self.__validate_original_runtime_profile(original_runtime_profile, retried_query_id)
# Assert that the query options from the original and retried queries are the same.
assert self.__get_query_options(original_runtime_profile) == \
def __get_query_options(self, profile):
"""Returns the query options from the given profile."""
query_options_pattern = "Query Options \(set by configuration and planner\): (.*)"
query_options =, profile)
assert query_options, profile
def __validate_original_runtime_profile(self, original_runtime_profile,
"""Validate the contents of the runtime profile of the original query after the query
has been retried."""
# The runtime profile of the original query should reflect that the query failed due
# a retryable error, and that it was retried.
assert "Query State: EXCEPTION" in original_runtime_profile, original_runtime_profile
assert "Impala Query State: ERROR" in original_runtime_profile, \
assert "Query Status: " in original_runtime_profile, \
assert "Retry Status: RETRIED" in original_runtime_profile, original_runtime_profile
assert "Retry Cause: " in original_runtime_profile, \
# Assert that the query id of the retried query is in the runtime profile of the
# original query.
assert "Retried Query Id: {0}".format(retried_query_id) \
in original_runtime_profile, original_runtime_profile
# Assert that the original query ran on all three nodes. All queries scan tables
# large enough such that scan fragments are scheduled on all impalads.
assert"PLAN FRAGMENT.*instances=3", original_runtime_profile), \
def __validate_web_ui_state(self):
"""Validate the state of the web ui after a query (or queries) have been retried.
The web ui should list 0 queries as in flight, running, or queued."""
impalad_service = self.cluster.get_first_impalad().service
# Assert that the debug web ui shows all queries as completed
self.assert_eventually(60, 0.1,
lambda: impalad_service.get_num_in_flight_queries() == 0)
assert impalad_service.get_num_running_queries('default-pool') == 0
assert impalad_service.get_num_queued_queries('default-pool') == 0
def __assert_executors_blacklisted(self, blacklisted_impalad, profile):
"""Validate that the given profile indicates that the given impalad was blacklisted
during query execution."""
assert "Blacklisted Executors: {0}:{1}".format(blacklisted_impalad.hostname,
blacklisted_impalad.service.krpc_port) in profile, profile
def __assert_executors_not_blacklisted(self, impalad, profile):
"""Validate that the given profile indicates that the given impalad was not
blacklisted during retried query execution"""
assert not ("Blacklisted Executors: {0}:{1}".format(impalad.hostname,
impalad.service.krpc_port) in profile), profile
def __assert_executors_not_assigned_any_finstance(self, impalad, profile):
"""Validate that the given profile indicates that the given impalad was not
assigned any fragment instance for query execution"""
assert not ("host={0}:{1}".format(impalad.hostname,
impalad.service.krpc_port) in profile), profile
def __validate_client_log(self, handle, retried_query_id, use_hs2_client=False):
"""Validate the GetLog result contains query retry information"""
if use_hs2_client:
client_log = self.hs2_client.get_log(handle)
client_log = self.client.get_log(handle)
assert "Original query failed:" in client_log
query_id_search ="Query has been retried using query id: (.*)\n",
assert query_id_search,\
"Invalid client log, has no retried query id. Log=%s" % client_log
assert == retried_query_id
def __get_retried_query_id_from_summary(self, handle, use_hs2_client=False):
if use_hs2_client:
summary = self.hs2_client.get_exec_summary(handle)
summary = self.client.get_exec_summary(handle)
if summary.error_logs:
for log in summary.error_logs:
query_id_search ="Retrying query using query id: (.*)", log)
if query_id_search:
return None
def __exist_queries_in_web_ui_memz(self):
memz_breakdown = self.cluster.get_first_impalad() \
query = re.compile("Query\([0-9a-f]{16}:[0-9a-f]{16}")
def __validate_memz(self):
# Validate that all queries are released
self.assert_eventually(60, 0.1,
lambda: self.__exist_queries_in_web_ui_memz() is None)
# Tests that verify the query-retries are properly triggered by disk IO failure.
# Coordinator adds an executor node to its blacklist if that node reports query
# execution status with error caused by its local faulty disk, then retries the failed
# query.
class TestQueryRetriesFaultyDisk(CustomClusterTestSuite):
def get_workload(cls):
return 'functional-query'
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestQueryRetriesFaultyDisk, cls).setup_class()
# Query with order by requires spill to disk if intermediate results don't fit in mem
spill_query = """
select o_orderdate, o_custkey, o_comment
from tpch.orders
order by o_orderdate
# Buffer pool limit that is low enough to force Impala to spill to disk when executing
# spill_query.
buffer_pool_limit = "45m"
def setup_method(self, method):
# Don't call the superclass method to prevent starting Impala before each test. In
# this class, each test is responsible for doing that because we want to generate
# the parameter string to start-impala-cluster in each test method.
self.created_dirs = []
def teardown_method(self, method):
for dir_path in self.created_dirs:
shutil.rmtree(dir_path, ignore_errors=True)
def __generate_scratch_dir(self, num):
result = []
for i in xrange(num):
dir_path = tempfile.mkdtemp()
print "Generated dir" + dir_path
return result
def __validate_web_ui_state(self):
"""Validate the state of the web ui after a query (or queries) have been retried.
The web ui should list 0 queries as in flight, running, or queued."""
impalad_service = self.cluster.get_first_impalad().service
# Assert that the debug web ui shows all queries as completed
self.assert_eventually(60, 0.1,
lambda: impalad_service.get_num_in_flight_queries() == 0)
assert impalad_service.get_num_running_queries('default-pool') == 0
assert impalad_service.get_num_queued_queries('default-pool') == 0
def test_retry_spill_to_disk_failed(self, vector):
""" Test that verifies that when an impalad failed during spill-to-disk due to disk
write error, it is properly blacklisted by coordinator and query-retry is
# Start cluster with spill-to-disk enabled and one dedicated coordinator. Set a
# really high statestore heartbeat frequency so that blacklisted nodes are not
# timeout too quickly.
scratch_dirs = self.__generate_scratch_dir(1)
'--cluster_size=3', '--num_coordinators=1', '--use_exclusive_coordinators'])
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
# Set debug_action to inject disk write error for spill-to-disk on impalad for which
# krpc port is 27001.
vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
vector.get_value('exec_option')['debug_action'] = \
vector.get_value('exec_option')['retry_failed_queries'] = "true"
coord_impalad = self.cluster.get_first_impalad()
client = coord_impalad.service.create_beeswax_client()
disk_failure_impalad = self.cluster.impalads[1]
assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT
# Verify all nodes are active now.
backends_json = self.cluster.impalads[0].service.get_debug_webpage_json("/backends")
assert backends_json["num_active_backends"] == 3, backends_json
assert len(backends_json["backends"]) == 3, backends_json
# Expect the impalad with disk failure is blacklisted, and query-retry is triggered
# and is completed successfully.
handle = self.execute_query_async_using_client(client, self.spill_query, vector)
results = client.fetch(self.spill_query, handle)
assert results.success
# Validate the state of the web ui. The query must be closed before validating the
# state since it asserts that no queries are in flight.
# Verify that the impalad with injected disk IO error is blacklisted.
backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
assert backends_json["num_blacklisted_backends"] == 1, backends_json
assert backends_json["num_active_backends"] == 2, backends_json
assert len(backends_json["backends"]) == 3, backends_json
num_blacklisted = 0
for backend_json in backends_json["backends"]:
if str(disk_failure_impalad.service.krpc_port) in backend_json["krpc_address"]:
assert backend_json["is_blacklisted"], backend_json
assert "Query execution failure caused by local disk IO fatal error on backend" \
in backend_json["blacklist_cause"]
num_blacklisted += 1
assert not backend_json["is_blacklisted"], backend_json
assert num_blacklisted == 1, backends_json
# Verify that the query is re-tried and finished.
completed_queries = coord_impalad.service.get_completed_queries()
# Assert that the most recently completed query is the retried query and it is marked
assert completed_queries[0]['state'] == 'FINISHED'
assert completed_queries[0]["rows_fetched"] == 1500000
# Assert that the second most recently completed query is the original query and it
# is marked as 'RETRIED'.
assert completed_queries[1]['state'] == 'RETRIED'
assert completed_queries[1]["rows_fetched"] == 0
assert completed_queries[1]['query_id'] == handle.get_handle().id