from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
import pytest
import re
import shutil
import tempfile
from beeswaxd.BeeswaxService import QueryState
from tests.common.skip import SkipIfNotHdfsMinicluster
from tests.common.skip import SkipIfBuildType
from time import sleep
# The BE krpc port of the impalad to simulate disk errors in tests.
def _get_disk_write_fail_action(port):
return "IMPALA_TMP_FILE_WRITE:{port}:FAIL".format(port=port)
# Tests that verify the behavior of the executor blacklist caused by RPC failure.
# Coordinator adds an executor node to its blacklist if the RPC to that node failed.
# Note: query-retry is not enabled by default.
class TestBlacklist(CustomClusterTestSuite):
def get_workload(cls):
return 'functional-query'
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestBlacklist, cls).setup_class()
def test_kill_impalad(self, cursor):
"""Test that verifies that when an impalad is killed, it is properly blacklisted."""
# Run a query and verify that no impalads are blacklisted yet.
result = self.execute_query("select count(*) from tpch.lineitem")
assert"Blacklisted Executors: (.*)", result.runtime_profile) is None, \
# Kill an impalad
killed_impalad = self.cluster.impalads[2]
# Run a query which should fail as the impalad hasn't been blacklisted yet.
self.execute_query("select count(*) from tpch.lineitem")
assert False, "Query was expected to fail"
except Exception as e:
assert "Exec() rpc failed" in str(e)
# Run another query which should succeed and verify the impalad was blacklisted.
result = self.execute_query("select count(*) from tpch.lineitem")
backends_json = self.cluster.impalads[0].service.get_debug_webpage_json("/backends")
match ="Blacklisted Executors: (.*)", result.runtime_profile)
assert == "%s:%s" % \
(killed_impalad.hostname, killed_impalad.service.krpc_port), \
assert backends_json["num_blacklisted_backends"] == 1, backends_json
assert backends_json["num_active_backends"] == 2, backends_json
assert len(backends_json["backends"]) == 3, backends_json
num_blacklisted = 0
for backend_json in backends_json["backends"]:
if str(killed_impalad.service.krpc_port) in backend_json["krpc_address"]:
assert backend_json["is_blacklisted"], backend_json
num_blacklisted += 1
assert not backend_json["is_blacklisted"], backend_json
assert num_blacklisted == 1, backends_json
# Sleep for long enough for the statestore to remove the impalad from the cluster
# membership, i.e. Statestore::FailedExecutorDetectionTime() + some padding
# Run another query and verify nothing was blacklisted and only 2 backends were
# scheduled on.
result = self.execute_query("select count(*) from tpch.lineitem")
assert"Blacklisted Executors: (.*)", result.runtime_profile) is None, \
assert"NumBackends: 2", result.runtime_profile), result.runtime_profile
def test_restart_impalad(self, cursor):
"""Test that verifies the behavior when an impalad is killed, blacklisted, and then
# Run a query and verify that no impalads are blacklisted yet.
result = self.execute_query("select count(*) from tpch.lineitem")
assert"Blacklisted Executors: (.*)", result.runtime_profile) is None, \
# Kill an impalad
killed_impalad = self.cluster.impalads[2]
# Run a query which should fail as the impalad hasn't been blacklisted yet.
self.execute_query("select count(*) from tpch.lineitem")
assert False, "Query was expected to fail"
except Exception as e:
assert "Exec() rpc failed" in str(e)
# Run another query which should succeed and verify the impalad was blacklisted.
result = self.execute_query("select count(*) from tpch.lineitem")
match ="Blacklisted Executors: (.*)", result.runtime_profile)
assert == "%s:%s" % \
(killed_impalad.hostname, killed_impalad.service.krpc_port), \
# Restart the impalad.
# Sleep for long enough for the statestore to update the membership to include the
# restarted impalad, ImpaladProcess.start() won't return until the Impalad says its
# ready to accept connections, at which point it will have already registered with the
# statestore, so we don't need to sleep very long.
# Run another query and verify nothing was blacklisted and all 3 backends were
# scheduled on.
result = self.execute_query("select count(*) from tpch.lineitem")
assert"Blacklisted Executors: (.*)", result.runtime_profile) is None, \
assert"NumBackends: 3", result.runtime_profile), result.runtime_profile
def test_kill_impalad_with_running_queries(self, cursor):
"""Verifies that when an Impala executor is killed while running a query, that the
Coordinator blacklists the killed executor."""
# Run a query asynchronously. With the debug actions, this query should take a few
# minutes to complete.
query = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
where t1.l_orderkey = t2.l_orderkey"
handle = self.execute_query_async(query, query_options={
'debug_action': '0:GETNEXT:DELAY|1:GETNEXT:DELAY'})
# Wait for the query to start running
self.wait_for_any_state(handle, [QueryState.RUNNING, QueryState.FINISHED], 10)
# Kill one of the Impala executors
killed_impalad = self.cluster.impalads[2]
# Try to fetch results from the query. Fetch requests should fail because one of the
# impalads running the query was killed. When the query fails, the Coordinator should
# add the killed Impala executor to the blacklist (since a RPC to that node failed).
self.client.fetch(query, handle)
assert False, "Query was expected to fail"
except Exception as e:
# The query should fail due to an RPC error.
assert "TransmitData() to " in str(e) or "EndDataStream() to " in str(e)
# Run another query which should succeed and verify the impalad was blacklisted.
self.client.clear_configuration() # remove the debug actions
result = self.execute_query("select count(*) from tpch.lineitem")
match ="Blacklisted Executors: (.*)", result.runtime_profile)
assert match is not None and == "%s:%s" % \
(killed_impalad.hostname, killed_impalad.service.krpc_port), \
# Tests that verify the behavior of the executor blacklist caused by disk IO failure.
# Coordinator adds an executor node to its blacklist if that node reported query
# execution status with error caused by its local faulty disk.
# Note: query-retry is not enabled by default and we assume it's not enabled in following
# tests.
class TestBlacklistFaultyDisk(CustomClusterTestSuite):
def get_workload(cls):
return 'functional-query'
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestBlacklistFaultyDisk, cls).setup_class()
# Query with order by requires spill to disk if intermediate results don't fit in mem
spill_query = """
select o_orderdate, o_custkey, o_comment
from tpch.orders
order by o_orderdate
# Query against a big table with order by requires spill to disk if intermediate
# results don't fit in memory.
spill_query_big_table = """
select l_orderkey, l_linestatus, l_shipdate, l_comment
from tpch.lineitem
order by l_orderkey
# Query without order by can be executed without spilling to disk.
in_mem_query = """
select o_orderdate, o_custkey, o_comment from tpch.orders
# Buffer pool limit that is low enough to force Impala to spill to disk when executing
# spill_query.
buffer_pool_limit = "45m"
def __generate_scratch_dir(self, num):
result = []
for i in xrange(num):
dir_path = tempfile.mkdtemp()
print "Generated dir" + dir_path
return result
def setup_method(self, method):
# Don't call the superclass method to prevent starting Impala before each test. In
# this class, each test is responsible for doing that because we want to generate
# the parameter string to start-impala-cluster in each test method.
self.created_dirs = []
def teardown_method(self, method):
for dir_path in self.created_dirs:
shutil.rmtree(dir_path, ignore_errors=True)
def test_scratch_file_write_failure(self, vector):
""" Test that verifies that when an impalad failed to execute query during spill-to-
disk due to disk write error, it is properly blacklisted by coordinator."""
# Start cluster with spill-to-disk enabled and one dedicated coordinator. Set a high
# statestore heartbeat frequency so that blacklisted nodes are not timeout too
# quickly.
scratch_dirs = self.__generate_scratch_dir(2)
'--cluster_size=3', '--num_coordinators=1', '--use_exclusive_coordinators'])
self.assert_impalad_log_contains("INFO", "Using scratch directory ",
# First set debug_action for query as empty.
vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
vector.get_value('exec_option')['debug_action'] = ''
coord_impalad = self.cluster.get_first_impalad()
client = coord_impalad.service.create_beeswax_client()
# Expect spill to disk to success with debug_action as empty. Verify all nodes are
# active.
handle = self.execute_query_async_using_client(client, self.spill_query, vector)
results = client.fetch(self.spill_query, handle)
assert results.success
backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
assert backends_json["num_active_backends"] == 3, backends_json
assert len(backends_json["backends"]) == 3, backends_json
# Set debug_action to inject disk write error for spill-to-disk on impalad for which
# krpc port is 27001.
vector.get_value('exec_option')['debug_action'] = \
# Should be able to execute in-memory query.
handle = self.execute_query_async_using_client(client, self.in_mem_query, vector)
results = client.fetch(self.in_mem_query, handle)
assert results.success
# Expect spill to disk to fail due to disk failure on the impalad with disk failure.
# Verify one node is blacklisted.
disk_failure_impalad = self.cluster.impalads[1]
assert disk_failure_impalad.service.krpc_port == FAILED_KRPC_PORT
handle = self.execute_query_async_using_client(client, self.spill_query, vector)
results = client.fetch(self.spill_query, handle)
assert False, "Query was expected to fail"
except Exception as e:
assert "Query execution failure caused by local disk IO fatal error on backend" \
in str(e)
backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
assert backends_json["num_blacklisted_backends"] == 1, backends_json
assert backends_json["num_active_backends"] == 2, backends_json
assert len(backends_json["backends"]) == 3, backends_json
num_blacklisted = 0
for backend_json in backends_json["backends"]:
if str(disk_failure_impalad.service.krpc_port) in backend_json["krpc_address"]:
assert backend_json["is_blacklisted"], backend_json
assert "Query execution failure caused by local disk IO fatal error on backend" \
in backend_json["blacklist_cause"]
num_blacklisted += 1
assert not backend_json["is_blacklisted"], backend_json
assert num_blacklisted == 1, backends_json
# Should be able to re-execute same query since the impalad with injected disk error
# for spill-to-disk was blacklisted.
handle = self.execute_query_async_using_client(client, self.spill_query, vector)
results = client.fetch(self.spill_query, handle)
assert results.success
# Verify that the runtime profile contains the "Blacklisted Executors" line with the
# corresponding backend.
runtime_profile = client.get_runtime_profile(handle)
match ="Blacklisted Executors: (.*)", runtime_profile)
assert match is not None and == "%s:%s" % \
(disk_failure_impalad.hostname, disk_failure_impalad.service.krpc_port), \
# Sleep for long enough time and verify blacklisted backend was removed from the
# blacklist after blacklisting timeout.
backends_json = coord_impalad.service.get_debug_webpage_json("/backends")
assert backends_json["num_active_backends"] == 3, backends_json
# Run the query again without the debug action and verify nothing was blacklisted
# and all 3 backends were scheduled on.
vector.get_value('exec_option')['debug_action'] = ''
handle = self.execute_query_async_using_client(client, self.spill_query, vector)
results = client.fetch(self.spill_query, handle)
assert results.success
runtime_profile = client.get_runtime_profile(handle)
assert"Blacklisted Executors: (.*)", runtime_profile) is None, \
assert"NumBackends: 3", runtime_profile), runtime_profile