blob: 00ce672939453a4b532e3fc07276ff1dc482fc79 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
import pytest
import tests.common.cluster_config as cluster_config
from tests.common.custom_cluster_test_suite import CustomClusterTestSuite
from tests.common.test_result_verifier import error_msg_startswith
from tests.util.cancel_util import (
QueryToKill,
assert_kill_error,
assert_kill_ok
)
class TestKillQuery(CustomClusterTestSuite):
@pytest.mark.execute_serially
def test_coordinator_unreachable(self):
"""
The coordinator of the query to kill is unreachable.
It is required that each impalad in the cluster is a coordinator.
"""
protocol = 'hs2'
with self.create_client_for_nth_impalad(0, protocol) as client, \
QueryToKill(
self,
protocol,
check_on_exit=False,
nth_impalad=2) as query_id_to_kill:
coordinator_to_kill = self.cluster.impalads[2]
coordinator_to_kill.kill()
assert_kill_error(
client,
"KillQuery() RPC failed: Network error:",
query_id=query_id_to_kill,
)
@pytest.mark.execute_serially
@CustomClusterTestSuite.with_args(force_restart=True)
def test_another_coordinator_unreachable(self):
"""
A coordinator other than the one of the query to kill is unreachable.
It is required that each impalad in the cluster is a coordinator.
"""
protocol = 'hs2'
with self.create_client_for_nth_impalad(0, protocol) as client, \
QueryToKill(self, protocol, nth_impalad=2) as query_id_to_kill:
coordinator_to_kill = self.cluster.impalads[1] # impalad 1 is between 0 and 2.
coordinator_to_kill.kill()
assert_kill_ok(client, query_id_to_kill)
@pytest.mark.execute_serially
@cluster_config.single_coordinator
def test_single_coordinator(self):
"""
Test when there is only one coordinator in the cluster.
"""
protocol = 'hs2'
with self.create_client_for_nth_impalad(0, protocol) as client:
assert_kill_error(
client,
"Could not find query on any coordinator.",
query_id='123:456')
@pytest.mark.execute_serially
@cluster_config.admit_one_query_at_a_time
def test_admit_one_query_at_a_time(self):
"""
Make sure queries can be killed when only one query is allowed to run at a time.
"""
protocol = 'hs2'
with self.create_client_for_nth_impalad(0, protocol) as client, \
QueryToKill(self, protocol) as query_id_to_kill:
assert_kill_ok(client, query_id_to_kill)
@pytest.mark.execute_serially
@cluster_config.admit_no_query
def test_admit_no_query(self):
"""
Make sure KILL QUERY statement can be executed when no query will be admitted.
This is to show that KILL QUERY statements are not subject to admission control.
"""
protocol = 'hs2'
with self.create_client_for_nth_impalad(0, protocol) as client:
try:
client.execute("SELECT 1")
except Exception as e:
expected_msg = (
"Rejected query from pool default-pool: "
"disabled by requests limit set to 0"
)
assert error_msg_startswith(str(e), expected_msg)
assert_kill_error(
client,
"Could not find query on any coordinator",
query_id="123:456"
)
@cluster_config.enable_authorization
class TestKillQueryAuthorization(CustomClusterTestSuite):
@pytest.mark.execute_serially
def test_kill_as_admin(self):
# ImpylaHS2Connection does not support authentication yet.
protocol = 'beeswax'
with self.create_client_for_nth_impalad(0, protocol) as client, \
QueryToKill(self, protocol, user="user1") as query_id_to_kill:
assert_kill_ok(client, query_id_to_kill, user=cluster_config.ADMIN)
@pytest.mark.execute_serially
def test_kill_as_non_admin(self):
# ImpylaHS2Connection does not support authentication yet.
protocol = 'beeswax'
user1, user2 = "user1", "user2"
with self.create_client_for_nth_impalad(0, protocol) as user1_client, \
self.create_client_for_nth_impalad(0, protocol) as user2_client, \
QueryToKill(self, protocol, user=user1) as query_id_to_kill:
assert_kill_error(
user2_client,
"User '{0}' is not authorized to kill the query.".format(user2),
query_id=query_id_to_kill,
user=user2,
)
assert_kill_ok(user1_client, query_id_to_kill, user=user1)