blob: b278b8bf17672bab1e10a9882406a19260e582b6 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
import pytest
import re
from beeswaxd.BeeswaxService import QueryState
from tests.common.skip import SkipIfNotHdfsMinicluster
from time import sleep
# Tests that verify the behavior of the executor blacklist.
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
class TestBlacklist(CustomClusterTestSuite):
@classmethod
def get_workload(cls):
return 'functional-query'
@classmethod
def setup_class(cls):
if cls.exploration_strategy() != 'exhaustive':
pytest.skip('runs only in exhaustive')
super(TestBlacklist, cls).setup_class()
@pytest.mark.execute_serially
def test_kill_impalad(self, cursor):
"""Test that verifies that when an impalad is killed, it is properly blacklisted."""
# Run a query and verify that no impalads are blacklisted yet.
result = self.execute_query("select count(*) from tpch.lineitem")
assert re.search("Blacklisted Executors: (.*)", result.runtime_profile) is None, \
result.runtime_profile
# Kill an impalad
killed_impalad = self.cluster.impalads[2]
killed_impalad.kill()
# Run a query which should fail as the impalad hasn't been blacklisted yet.
try:
self.execute_query("select count(*) from tpch.lineitem")
assert False, "Query was expected to fail"
except Exception as e:
assert "Exec() rpc failed" in str(e)
# Run another query which should succeed and verify the impalad was blacklisted.
result = self.execute_query("select count(*) from tpch.lineitem")
match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
assert match.group(1) == "%s:%s" % \
(killed_impalad.hostname, killed_impalad.service.be_port), result.runtime_profile
# Sleep for long enough for the statestore to remove the impalad from the cluster
# membership, i.e. Statestore::FailedExecutorDetectionTime() + some padding
sleep(12)
# Run another query and verify nothing was blacklisted and only 2 backends were
# scheduled on.
result = self.execute_query("select count(*) from tpch.lineitem")
assert re.search("Blacklisted Executors: (.*)", result.runtime_profile) is None, \
result.runtime_profile
assert re.search("NumBackends: 2", result.runtime_profile), result.runtime_profile
@pytest.mark.execute_serially
def test_restart_impalad(self, cursor):
"""Test that verifies the behavior when an impalad is killed, blacklisted, and then
restarted."""
# Run a query and verify that no impalads are blacklisted yet.
result = self.execute_query("select count(*) from tpch.lineitem")
assert re.search("Blacklisted Executors: (.*)", result.runtime_profile) is None, \
result.runtime_profile
# Kill an impalad
killed_impalad = self.cluster.impalads[2]
killed_impalad.kill()
# Run a query which should fail as the impalad hasn't been blacklisted yet.
try:
self.execute_query("select count(*) from tpch.lineitem")
assert False, "Query was expected to fail"
except Exception as e:
assert "Exec() rpc failed" in str(e)
# Run another query which should succeed and verify the impalad was blacklisted.
result = self.execute_query("select count(*) from tpch.lineitem")
match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
assert match.group(1) == "%s:%s" % \
(killed_impalad.hostname, killed_impalad.service.be_port), result.runtime_profile
# Restart the impalad.
killed_impalad.start()
# Sleep for long enough for the statestore to update the membership to include the
# restarted impalad, ImpaladProcess.start() won't return until the Impalad says its
# ready to accept connections, at which point it will have already registered with the
# statestore, so we don't need to sleep very long.
sleep(2)
# Run another query and verify nothing was blacklisted and all 3 backends were
# scheduled on.
result = self.execute_query("select count(*) from tpch.lineitem")
assert re.search("Blacklisted Executors: (.*)", result.runtime_profile) is None, \
result.runtime_profile
assert re.search("NumBackends: 3", result.runtime_profile), result.runtime_profile
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(num_exclusive_coordinators=1)
def test_kill_impalad_with_running_queries(self, cursor):
"""Verifies that when an Impala executor is killed while running a query, that the
Coordinator blacklists the killed executor."""
# Run a query asynchronously. Normally, this query should take a few seconds to
# complete.
query = "select count(*) from tpch_parquet.lineitem t1, tpch_parquet.lineitem t2 \
where t1.l_orderkey = t2.l_orderkey"
handle = self.execute_query_async(query)
# Wait for the query to start running
self.wait_for_any_state(handle, [QueryState.RUNNING, QueryState.FINISHED], 10)
# Kill one of the Impala executors
killed_impalad = self.cluster.impalads[2]
killed_impalad.kill()
# Try to fetch results from the query. Fetch requests should fail because one of the
# impalads running the query was killed. When the query fails, the Coordinator should
# add the killed Impala executor to the blacklist (since a RPC to that node failed).
try:
self.client.fetch(query, handle)
assert False, "Query was expected to fail"
except Exception as e:
# The query should fail due to an RPC error.
assert "TransmitData() to " in str(e) or "EndDataStream() to " in str(e)
# Run another query which should succeed and verify the impalad was blacklisted.
result = self.execute_query("select count(*) from tpch.lineitem")
match = re.search("Blacklisted Executors: (.*)", result.runtime_profile)
assert match is not None and match.group(1) == "%s:%s" % \
(killed_impalad.hostname, killed_impalad.service.be_port), \
result.runtime_profile